Skip to content

Commit 94c645b

Browse files
committed
[Fix #782] Refining approach
Signed-off-by: fjtirado <[email protected]>
1 parent 806c8eb commit 94c645b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1185
-382
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class TaskContext implements TaskContextData {
3737
private WorkflowModel rawOutput;
3838
private Instant completedAt;
3939
private TransitionInfo transition;
40+
private boolean completed;
4041

4142
public TaskContext(
4243
WorkflowModel input,
@@ -109,6 +110,7 @@ public WorkflowModel rawOutput() {
109110

110111
public TaskContext output(WorkflowModel output) {
111112
this.output = output;
113+
this.completed = true;
112114
return this;
113115
}
114116

@@ -159,6 +161,10 @@ public TaskContext transition(TransitionInfo transition) {
159161
return this;
160162
}
161163

164+
public boolean isCompleted() {
165+
return completed;
166+
}
167+
162168
@Override
163169
public String toString() {
164170
return "TaskContext [position="

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class WorkflowApplication implements AutoCloseable {
6060
private final Collection<EventPublisher> eventPublishers;
6161
private final boolean lifeCycleCEPublishingEnabled;
6262

63-
protected WorkflowApplication(Builder builder) {
63+
private WorkflowApplication(Builder builder) {
6464
this.taskFactory = builder.taskFactory;
6565
this.exprFactory = builder.exprFactory;
6666
this.resourceLoaderFactory = builder.resourceLoaderFactory;
@@ -148,7 +148,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
148148
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
149149
private boolean lifeCycleCEPublishingEnabled = true;
150150

151-
protected Builder() {}
151+
private Builder() {}
152152

153153
public Builder withListener(WorkflowExecutionListener listener) {
154154
listeners.add(listener);
@@ -256,11 +256,7 @@ public Map<WorkflowDefinitionId, WorkflowDefinition> workflowDefinitions() {
256256

257257
public WorkflowDefinition workflowDefinition(Workflow workflow) {
258258
return definitions.computeIfAbsent(
259-
WorkflowDefinitionId.of(workflow), k -> createDefinition(workflow));
260-
}
261-
262-
protected WorkflowDefinition createDefinition(Workflow workflow) {
263-
return WorkflowDefinition.of(this, workflow);
259+
WorkflowDefinitionId.of(workflow), k -> WorkflowDefinition.of(this, workflow));
264260
}
265261

266262
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.serverlessworkflow.impl.resources.ResourceLoader;
2626
import io.serverlessworkflow.impl.schema.SchemaValidator;
2727
import java.nio.file.Path;
28+
import java.util.HashMap;
29+
import java.util.Map;
2830
import java.util.Optional;
2931

3032
public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {
@@ -37,6 +39,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3739
private final WorkflowApplication application;
3840
private final TaskExecutor<?> taskExecutor;
3941
private final ResourceLoader resourceLoader;
42+
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
4043

4144
private WorkflowDefinition(
4245
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
@@ -72,8 +75,7 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
7275
public WorkflowInstance instance(Object input) {
7376
WorkflowModel inputModel = application.modelFactory().fromAny(input);
7477
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
75-
return new WorkflowMutableInstance(
76-
this, application().idFactory().get(), inputModel, WorkflowStatus.PENDING);
78+
return new WorkflowMutableInstance(this, application().idFactory().get(), inputModel);
7779
}
7880

7981
Optional<SchemaValidator> inputSchemaValidator() {
@@ -110,6 +112,14 @@ public ResourceLoader resourceLoader() {
110112
return resourceLoader;
111113
}
112114

115+
public TaskExecutor<?> taskExecutor(String jsonPointer) {
116+
return executors.get(jsonPointer);
117+
}
118+
119+
public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> taskExecutor) {
120+
executors.put(position.jsonPointer(), taskExecutor);
121+
}
122+
113123
@Override
114124
public void close() {}
115125
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -38,32 +38,43 @@
3838
public class WorkflowMutableInstance implements WorkflowInstance {
3939

4040
protected final AtomicReference<WorkflowStatus> status;
41-
private final String id;
42-
private final WorkflowModel input;
41+
protected final String id;
42+
protected final WorkflowModel input;
43+
44+
protected final WorkflowContext workflowContext;
45+
protected Instant startedAt;
46+
47+
protected AtomicReference<CompletableFuture<WorkflowModel>> futureRef = new AtomicReference<>();
48+
protected Instant completedAt;
4349

44-
private WorkflowContext workflowContext;
45-
private Instant startedAt;
46-
private Instant completedAt;
47-
private volatile WorkflowModel output;
4850
private Lock statusLock = new ReentrantLock();
49-
private CompletableFuture<WorkflowModel> completableFuture;
5051
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5152

52-
public WorkflowMutableInstance(
53-
WorkflowDefinition definition, String id, WorkflowModel input, WorkflowStatus status) {
53+
public WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) {
5454
this.id = id;
5555
this.input = input;
56-
this.status = new AtomicReference<>(status);
56+
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
5757
this.workflowContext = new WorkflowContext(definition, this);
5858
}
5959

6060
@Override
6161
public CompletableFuture<WorkflowModel> start() {
62-
startedAt = Instant.now();
63-
status.set(WorkflowStatus.RUNNING);
64-
publishEvent(
65-
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
66-
this.completableFuture =
62+
return startExecution(
63+
() -> {
64+
startedAt = Instant.now();
65+
status.set(WorkflowStatus.RUNNING);
66+
publishEvent(
67+
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
68+
});
69+
}
70+
71+
protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnable) {
72+
CompletableFuture<WorkflowModel> future = futureRef.get();
73+
if (future != null) {
74+
return future;
75+
}
76+
runnable.run();
77+
future =
6778
TaskExecutorHelper.processTaskList(
6879
workflowContext.definition().startTask(),
6980
workflowContext,
@@ -75,7 +86,8 @@ public CompletableFuture<WorkflowModel> start() {
7586
.orElse(input))
7687
.whenComplete(this::whenFailed)
7788
.thenApply(this::whenSuccess);
78-
return completableFuture;
89+
futureRef.set(future);
90+
return future;
7991
}
8092

8193
private void whenFailed(WorkflowModel result, Throwable ex) {
@@ -94,7 +106,7 @@ private void handleException(Throwable ex) {
94106
}
95107

96108
private WorkflowModel whenSuccess(WorkflowModel node) {
97-
output =
109+
WorkflowModel output =
98110
workflowContext
99111
.definition()
100112
.outputFilter()
@@ -103,7 +115,8 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
103115
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
104116
status.set(WorkflowStatus.COMPLETED);
105117
publishEvent(
106-
workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext)));
118+
workflowContext,
119+
l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output)));
107120
return output;
108121
}
109122

@@ -134,19 +147,17 @@ public WorkflowStatus status() {
134147

135148
@Override
136149
public WorkflowModel output() {
137-
return output;
150+
return futureRef.get().join();
138151
}
139152

140153
@Override
141154
public <T> T outputAs(Class<T> clazz) {
142-
return output != null
143-
? output
144-
.as(clazz)
145-
.orElseThrow(
146-
() ->
147-
new IllegalArgumentException(
148-
"Output " + output + " cannot be converted to class " + clazz))
149-
: null;
155+
return output()
156+
.as(clazz)
157+
.orElseThrow(
158+
() ->
159+
new IllegalArgumentException(
160+
"Output " + output() + " cannot be converted to class " + clazz));
150161
}
151162

152163
public void status(WorkflowStatus state) {
@@ -236,13 +247,6 @@ public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
236247
return CompletableFuture.completedFuture(t);
237248
}
238249

239-
// internal purposes only, not to be invoked directly by users of the API
240-
public void restore(
241-
WorkflowPosition position, WorkflowModel model, WorkflowModel context, Instant startedAt) {
242-
this.startedAt = startedAt;
243-
workflowContext.context(context);
244-
}
245-
246250
@Override
247251
public boolean cancel() {
248252
try {
@@ -260,4 +264,6 @@ public boolean cancel() {
260264
statusLock.unlock();
261265
}
262266
}
267+
268+
public void restoreContext(WorkflowDefinition definition, TaskContext context) {}
263269
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,13 @@ public abstract static class AbstractTaskExecutorBuilder<
7878
protected final WorkflowApplication application;
7979
protected final Workflow workflow;
8080
protected final ResourceLoader resourceLoader;
81+
private final WorkflowDefinition definition;
8182

8283
private V instance;
8384

8485
protected AbstractTaskExecutorBuilder(
8586
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
87+
this.definition = definition;
8688
this.workflow = definition.workflow();
8789
this.taskName = position.last().toString();
8890
this.position = position;
@@ -147,6 +149,7 @@ public V build() {
147149
if (instance == null) {
148150
instance = buildInstance();
149151
buildTransition(instance);
152+
definition.addTaskExecutor(position, instance);
150153
}
151154
return instance;
152155
}
@@ -189,8 +192,9 @@ private CompletableFuture<TaskContext> executeNext(
189192
public CompletableFuture<TaskContext> apply(
190193
WorkflowContext workflowContext, Optional<TaskContext> parentContext, WorkflowModel input) {
191194
TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task);
195+
workflowContext.instance().restoreContext(workflowContext.definition(), taskContext);
192196
CompletableFuture<TaskContext> completable = CompletableFuture.completedFuture(taskContext);
193-
if (!TaskExecutorHelper.isActive(workflowContext)) {
197+
if (taskContext.isCompleted() && !TaskExecutorHelper.isActive(workflowContext)) {
194198
return completable;
195199
}
196200
if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
@@ -256,6 +260,10 @@ private void handleException(
256260
}
257261
}
258262

263+
public WorkflowPosition position() {
264+
return position;
265+
}
266+
259267
protected abstract TransitionInfo getSkipTransition();
260268

261269
protected abstract CompletableFuture<TaskContext> execute(

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,18 @@
1616
package io.serverlessworkflow.impl.lifecycle;
1717

1818
import io.serverlessworkflow.impl.WorkflowContextData;
19+
import io.serverlessworkflow.impl.WorkflowModel;
1920

2021
public class WorkflowCompletedEvent extends WorkflowEvent {
2122

22-
public WorkflowCompletedEvent(WorkflowContextData workflow) {
23+
private WorkflowModel output;
24+
25+
public WorkflowCompletedEvent(WorkflowContextData workflow, WorkflowModel output) {
2326
super(workflow);
27+
this.output = output;
28+
}
29+
30+
public WorkflowModel output() {
31+
return output;
2432
}
2533
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ public void onWorkflowCompleted(WorkflowCompletedEvent event) {
225225
builder()
226226
.withData(
227227
cloudEventData(
228-
new WorkflowCompletedCEData(id(ev), ref(ev), ev.eventDate(), output(ev)),
228+
new WorkflowCompletedCEData(
229+
id(ev), ref(ev), ev.eventDate(), from(event.output())),
229230
this::convert))
230231
.withType(WORKFLOW_COMPLETED)
231232
.build());
@@ -328,10 +329,6 @@ private static String pos(TaskEvent ev) {
328329
return ev.taskContext().position().jsonPointer();
329330
}
330331

331-
private static Object output(WorkflowEvent ev) {
332-
return from(ev.workflowContext().instanceData().output());
333-
}
334-
335332
private static Object output(TaskEvent ev) {
336333
return from(ev.taskContext().output());
337334
}

impl/jackson/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<version>8.0.0-SNAPSHOT</version>
77
</parent>
88
<artifactId>serverlessworkflow-impl-jackson</artifactId>
9-
<name> Serverless Workflow :: Impl :: HTTP </name>
9+
<name> Serverless Workflow :: Impl :: Jackson </name>
1010
<dependencies>
1111
<dependency>
1212
<groupId>io.serverlessworkflow</groupId>

impl/persistence/api/pom.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
<version>8.0.0-SNAPSHOT</version>
77
</parent>
88
<artifactId>serverlessworkflow-persistence-api</artifactId>
9-
<name>Serverless Workflow :: Impl :: Pesistence:: API</name>
9+
<name>Serverless Workflow :: Impl :: Persistence:: API</name>
1010
<dependencies>
1111
<dependency>
1212
<groupId>io.serverlessworkflow</groupId>
1313
<artifactId>serverlessworkflow-impl-core</artifactId>
14-
</dependency>
14+
<version>8.0.0-SNAPSHOT</version>
15+
</dependency>
1516
</dependencies>
1617
</project>

0 commit comments

Comments
 (0)