Skip to content

Commit a61eb17

Browse files
committed
[Fix #822] TaskExecutorFactory to accept WorkflowDefinition
Signed-off-by: fjtirado <[email protected]>
1 parent 4989629 commit a61eb17

21 files changed

+104
-247
lines changed

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,23 @@
1818
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;
1919

2020
import io.serverlessworkflow.api.types.ForTask;
21-
import io.serverlessworkflow.api.types.Workflow;
2221
import io.serverlessworkflow.api.types.func.ForTaskFunction;
2322
import io.serverlessworkflow.api.types.func.LoopPredicateIndex;
2423
import io.serverlessworkflow.api.types.func.TypedFunction;
25-
import io.serverlessworkflow.impl.WorkflowApplication;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
2625
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2726
import io.serverlessworkflow.impl.WorkflowPredicate;
2827
import io.serverlessworkflow.impl.WorkflowValueResolver;
2928
import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder;
3029
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
31-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3230
import java.util.Collection;
3331
import java.util.Optional;
3432

3533
public class JavaForExecutorBuilder extends ForExecutorBuilder {
3634

3735
protected JavaForExecutorBuilder(
38-
WorkflowMutablePosition position,
39-
ForTask task,
40-
Workflow workflow,
41-
WorkflowApplication application,
42-
ResourceLoader resourceLoader) {
43-
super(position, task, workflow, application, resourceLoader);
36+
WorkflowMutablePosition position, ForTask task, WorkflowDefinition definition) {
37+
super(position, task, definition);
4438
}
4539

4640
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,18 @@
1919

2020
import io.serverlessworkflow.api.types.ListenTask;
2121
import io.serverlessworkflow.api.types.Until;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.UntilPredicate;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028

3129
public class JavaListenExecutorBuilder extends ListenExecutorBuilder {
3230

3331
protected JavaListenExecutorBuilder(
34-
WorkflowMutablePosition position,
35-
ListenTask task,
36-
Workflow workflow,
37-
WorkflowApplication application,
38-
ResourceLoader resourceLoader) {
39-
super(position, task, workflow, application, resourceLoader);
32+
WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) {
33+
super(position, task, definition);
4034
}
4135

4236
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,19 @@
1919

2020
import io.serverlessworkflow.api.types.SwitchCase;
2121
import io.serverlessworkflow.api.types.SwitchTask;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.SwitchCaseFunction;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028
import java.util.Optional;
3129

3230
public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder {
3331

3432
protected JavaSwitchExecutorBuilder(
35-
WorkflowMutablePosition position,
36-
SwitchTask task,
37-
Workflow workflow,
38-
WorkflowApplication application,
39-
ResourceLoader resourceLoader) {
40-
super(position, task, workflow, application, resourceLoader);
33+
WorkflowMutablePosition position, SwitchTask task, WorkflowDefinition definition) {
34+
super(position, task, definition);
4135
}
4236

4337
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,23 @@
1717

1818
import io.serverlessworkflow.api.types.Task;
1919
import io.serverlessworkflow.api.types.TaskBase;
20-
import io.serverlessworkflow.api.types.Workflow;
21-
import io.serverlessworkflow.impl.WorkflowApplication;
20+
import io.serverlessworkflow.impl.WorkflowDefinition;
2221
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2322
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
2423
import io.serverlessworkflow.impl.executors.TaskExecutorBuilder;
25-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2624

2725
public class JavaTaskExecutorFactory extends DefaultTaskExecutorFactory {
2826

2927
public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
30-
WorkflowMutablePosition position,
31-
Task task,
32-
Workflow workflow,
33-
WorkflowApplication application,
34-
ResourceLoader resourceLoader) {
28+
WorkflowMutablePosition position, Task task, WorkflowDefinition definition) {
3529
if (task.getForTask() != null) {
36-
return new JavaForExecutorBuilder(
37-
position, task.getForTask(), workflow, application, resourceLoader);
30+
return new JavaForExecutorBuilder(position, task.getForTask(), definition);
3831
} else if (task.getSwitchTask() != null) {
39-
return new JavaSwitchExecutorBuilder(
40-
position, task.getSwitchTask(), workflow, application, resourceLoader);
32+
return new JavaSwitchExecutorBuilder(position, task.getSwitchTask(), definition);
4133
} else if (task.getListenTask() != null) {
42-
return new JavaListenExecutorBuilder(
43-
position, task.getListenTask(), workflow, application, resourceLoader);
34+
return new JavaListenExecutorBuilder(position, task.getListenTask(), definition);
4435
} else {
45-
return super.getTaskExecutor(position, task, workflow, application, resourceLoader);
36+
return super.getTaskExecutor(position, task, definition);
4637
}
4738
}
4839
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3636
private Optional<WorkflowFilter> outputFilter = Optional.empty();
3737
private final WorkflowApplication application;
3838
private final TaskExecutor<?> taskExecutor;
39+
private final ResourceLoader resourceLoader;
3940

4041
private WorkflowDefinition(
4142
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
4243
this.workflow = workflow;
4344
this.application = application;
45+
this.resourceLoader = resourceLoader;
4446
if (workflow.getInput() != null) {
4547
Input input = workflow.getInput();
4648
this.inputSchemaValidator =
@@ -55,11 +57,7 @@ private WorkflowDefinition(
5557
}
5658
this.taskExecutor =
5759
TaskExecutorHelper.createExecutorList(
58-
application.positionFactory().get(),
59-
workflow.getDo(),
60-
workflow,
61-
application,
62-
resourceLoader);
60+
application.positionFactory().get(), workflow.getDo(), this);
6361
}
6462

6563
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) {
@@ -105,6 +103,10 @@ public WorkflowApplication application() {
105103
return application;
106104
}
107105

106+
public ResourceLoader resourceLoader() {
107+
return resourceLoader;
108+
}
109+
108110
@Override
109111
public void close() {
110112
// TODO close resourcers hold for uncompleted process instances, if any

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.serverlessworkflow.impl.TaskContext;
2929
import io.serverlessworkflow.impl.WorkflowApplication;
3030
import io.serverlessworkflow.impl.WorkflowContext;
31+
import io.serverlessworkflow.impl.WorkflowDefinition;
3132
import io.serverlessworkflow.impl.WorkflowFilter;
3233
import io.serverlessworkflow.impl.WorkflowModel;
3334
import io.serverlessworkflow.impl.WorkflowMutablePosition;
@@ -81,17 +82,13 @@ public abstract static class AbstractTaskExecutorBuilder<
8182
private V instance;
8283

8384
protected AbstractTaskExecutorBuilder(
84-
WorkflowMutablePosition position,
85-
T task,
86-
Workflow workflow,
87-
WorkflowApplication application,
88-
ResourceLoader resourceLoader) {
89-
this.workflow = workflow;
85+
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
86+
this.workflow = definition.workflow();
9087
this.taskName = position.last().toString();
9188
this.position = position;
9289
this.task = task;
93-
this.application = application;
94-
this.resourceLoader = resourceLoader;
90+
this.application = definition.application();
91+
this.resourceLoader = definition.resourceLoader();
9592
if (task.getInput() != null) {
9693
Input input = task.getInput();
9794
this.inputProcessor = buildWorkflowFilter(application, input.getFrom());
@@ -174,16 +171,18 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
174171

175172
protected final CompletableFuture<TaskContext> executeNext(
176173
CompletableFuture<TaskContext> future, WorkflowContext workflow) {
177-
return future.thenCompose(
178-
t -> {
179-
TransitionInfo transition = t.transition();
180-
if (transition.isEndNode()) {
181-
workflow.instance().status(WorkflowStatus.COMPLETED);
182-
} else if (transition.next() != null) {
183-
return transition.next().apply(workflow, t.parent(), t.output());
184-
}
185-
return CompletableFuture.completedFuture(t);
186-
});
174+
return future.thenCompose(t -> executeNext(workflow, t));
175+
}
176+
177+
private CompletableFuture<TaskContext> executeNext(
178+
WorkflowContext workflow, TaskContext taskContext) {
179+
TransitionInfo transition = taskContext.transition();
180+
if (transition.isEndNode()) {
181+
workflow.instance().status(WorkflowStatus.COMPLETED);
182+
} else if (transition.next() != null) {
183+
return transition.next().apply(workflow, taskContext.parent(), taskContext.output());
184+
}
185+
return CompletableFuture.completedFuture(taskContext);
187186
}
188187

189188
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
package io.serverlessworkflow.impl.executors;
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
19-
import io.serverlessworkflow.api.types.Workflow;
2019
import io.serverlessworkflow.impl.TaskContext;
21-
import io.serverlessworkflow.impl.WorkflowApplication;
2220
import io.serverlessworkflow.impl.WorkflowContext;
21+
import io.serverlessworkflow.impl.WorkflowDefinition;
2322
import io.serverlessworkflow.impl.WorkflowModel;
2423
import io.serverlessworkflow.impl.WorkflowMutablePosition;
25-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2624
import java.util.concurrent.CompletableFuture;
2725

2826
public class CallTaskExecutor<T extends TaskBase> extends RegularTaskExecutor<T> {
@@ -36,11 +34,9 @@ public static class CallTaskExecutorBuilder<T extends TaskBase>
3634
protected CallTaskExecutorBuilder(
3735
WorkflowMutablePosition position,
3836
T task,
39-
Workflow workflow,
40-
WorkflowApplication application,
41-
ResourceLoader resourceLoader,
37+
WorkflowDefinition definition,
4238
CallableTask<T> callable) {
43-
super(position, task, workflow, application, resourceLoader);
39+
super(position, task, definition);
4440
this.callable = callable;
4541
callable.init(task, workflow, application, resourceLoader);
4642
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
import io.serverlessworkflow.api.types.CallTask;
1919
import io.serverlessworkflow.api.types.Task;
2020
import io.serverlessworkflow.api.types.TaskBase;
21-
import io.serverlessworkflow.api.types.Workflow;
22-
import io.serverlessworkflow.impl.WorkflowApplication;
21+
import io.serverlessworkflow.impl.WorkflowDefinition;
2322
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2423
import io.serverlessworkflow.impl.executors.CallTaskExecutor.CallTaskExecutorBuilder;
2524
import io.serverlessworkflow.impl.executors.DoExecutor.DoExecutorBuilder;
@@ -32,7 +31,6 @@
3231
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
3332
import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder;
3433
import io.serverlessworkflow.impl.executors.WaitExecutor.WaitExecutorBuilder;
35-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3634
import java.util.ServiceLoader;
3735
import java.util.ServiceLoader.Provider;
3836

@@ -50,53 +48,34 @@ protected DefaultTaskExecutorFactory() {}
5048

5149
@Override
5250
public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
53-
WorkflowMutablePosition position,
54-
Task task,
55-
Workflow workflow,
56-
WorkflowApplication application,
57-
ResourceLoader resourceLoader) {
51+
WorkflowMutablePosition position, Task task, WorkflowDefinition definition) {
5852
if (task.getCallTask() != null) {
5953
CallTask callTask = task.getCallTask();
6054
TaskBase taskBase = (TaskBase) callTask.get();
6155
if (taskBase != null) {
6256
return new CallTaskExecutorBuilder(
63-
position,
64-
taskBase,
65-
workflow,
66-
application,
67-
resourceLoader,
68-
findCallTask(taskBase.getClass()));
57+
position, taskBase, definition, findCallTask(taskBase.getClass()));
6958
}
7059
} else if (task.getSwitchTask() != null) {
71-
return new SwitchExecutorBuilder(
72-
position, task.getSwitchTask(), workflow, application, resourceLoader);
60+
return new SwitchExecutorBuilder(position, task.getSwitchTask(), definition);
7361
} else if (task.getDoTask() != null) {
74-
return new DoExecutorBuilder(
75-
position, task.getDoTask(), workflow, application, resourceLoader);
62+
return new DoExecutorBuilder(position, task.getDoTask(), definition);
7663
} else if (task.getSetTask() != null) {
77-
return new SetExecutorBuilder(
78-
position, task.getSetTask(), workflow, application, resourceLoader);
64+
return new SetExecutorBuilder(position, task.getSetTask(), definition);
7965
} else if (task.getForTask() != null) {
80-
return new ForExecutorBuilder(
81-
position, task.getForTask(), workflow, application, resourceLoader);
66+
return new ForExecutorBuilder(position, task.getForTask(), definition);
8267
} else if (task.getRaiseTask() != null) {
83-
return new RaiseExecutorBuilder(
84-
position, task.getRaiseTask(), workflow, application, resourceLoader);
68+
return new RaiseExecutorBuilder(position, task.getRaiseTask(), definition);
8569
} else if (task.getTryTask() != null) {
86-
return new TryExecutorBuilder(
87-
position, task.getTryTask(), workflow, application, resourceLoader);
70+
return new TryExecutorBuilder(position, task.getTryTask(), definition);
8871
} else if (task.getForkTask() != null) {
89-
return new ForkExecutorBuilder(
90-
position, task.getForkTask(), workflow, application, resourceLoader);
72+
return new ForkExecutorBuilder(position, task.getForkTask(), definition);
9173
} else if (task.getWaitTask() != null) {
92-
return new WaitExecutorBuilder(
93-
position, task.getWaitTask(), workflow, application, resourceLoader);
74+
return new WaitExecutorBuilder(position, task.getWaitTask(), definition);
9475
} else if (task.getListenTask() != null) {
95-
return new ListenExecutorBuilder(
96-
position, task.getListenTask(), workflow, application, resourceLoader);
76+
return new ListenExecutorBuilder(position, task.getListenTask(), definition);
9777
} else if (task.getEmitTask() != null) {
98-
return new EmitExecutorBuilder(
99-
position, task.getEmitTask(), workflow, application, resourceLoader);
78+
return new EmitExecutorBuilder(position, task.getEmitTask(), definition);
10079
}
10180
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
10281
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
package io.serverlessworkflow.impl.executors;
1717

1818
import io.serverlessworkflow.api.types.DoTask;
19-
import io.serverlessworkflow.api.types.Workflow;
2019
import io.serverlessworkflow.impl.TaskContext;
21-
import io.serverlessworkflow.impl.WorkflowApplication;
2220
import io.serverlessworkflow.impl.WorkflowContext;
21+
import io.serverlessworkflow.impl.WorkflowDefinition;
2322
import io.serverlessworkflow.impl.WorkflowModel;
2423
import io.serverlessworkflow.impl.WorkflowMutablePosition;
25-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2624
import java.util.Optional;
2725
import java.util.concurrent.CompletableFuture;
2826

@@ -34,15 +32,9 @@ public static class DoExecutorBuilder extends RegularTaskExecutorBuilder<DoTask>
3432
private TaskExecutor<?> taskExecutor;
3533

3634
protected DoExecutorBuilder(
37-
WorkflowMutablePosition position,
38-
DoTask task,
39-
Workflow workflow,
40-
WorkflowApplication application,
41-
ResourceLoader resourceLoader) {
42-
super(position, task, workflow, application, resourceLoader);
43-
taskExecutor =
44-
TaskExecutorHelper.createExecutorList(
45-
position, task.getDo(), workflow, application, resourceLoader);
35+
WorkflowMutablePosition position, DoTask task, WorkflowDefinition definition) {
36+
super(position, task, definition);
37+
taskExecutor = TaskExecutorHelper.createExecutorList(position, task.getDo(), definition);
4638
}
4739

4840
@Override

0 commit comments

Comments
 (0)