Skip to content

Commit e3fdf68

Browse files
committed
[Fix #782] Refining approach
Signed-off-by: fjtirado <[email protected]>
1 parent 806c8eb commit e3fdf68

29 files changed

+864
-251
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/StringBufferWorkflowPosition.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18+
import java.util.Objects;
19+
1820
public class StringBufferWorkflowPosition implements WorkflowMutablePosition {
1921

2022
private StringBuilder sb;
@@ -67,4 +69,18 @@ public Object last() {
6769
int indexOf = sb.lastIndexOf("/");
6870
return indexOf != -1 ? jsonPointer().substring(indexOf + 1) : "";
6971
}
72+
73+
@Override
74+
public int hashCode() {
75+
return Objects.hash(sb);
76+
}
77+
78+
@Override
79+
public boolean equals(Object obj) {
80+
if (this == obj) return true;
81+
if (obj == null) return false;
82+
if (getClass() != obj.getClass()) return false;
83+
StringBufferWorkflowPosition other = (StringBufferWorkflowPosition) obj;
84+
return Objects.equals(sb, other.sb);
85+
}
7086
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.serverlessworkflow.impl.resources.ResourceLoader;
2626
import io.serverlessworkflow.impl.schema.SchemaValidator;
2727
import java.nio.file.Path;
28+
import java.util.HashMap;
29+
import java.util.Map;
2830
import java.util.Optional;
2931

3032
public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {
@@ -37,6 +39,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3739
private final WorkflowApplication application;
3840
private final TaskExecutor<?> taskExecutor;
3941
private final ResourceLoader resourceLoader;
42+
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
4043

4144
private WorkflowDefinition(
4245
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
@@ -110,6 +113,14 @@ public ResourceLoader resourceLoader() {
110113
return resourceLoader;
111114
}
112115

116+
public TaskExecutor<?> taskExecutor(WorkflowPosition position) {
117+
return executors.get(position.jsonPointer());
118+
}
119+
120+
public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> taskExecutor) {
121+
executors.put(position.jsonPointer(), taskExecutor);
122+
}
123+
113124
@Override
114125
public void close() {}
115126
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4949
private CompletableFuture<WorkflowModel> completableFuture;
5050
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5151

52-
public WorkflowMutableInstance(
52+
protected WorkflowMutableInstance(
5353
WorkflowDefinition definition, String id, WorkflowModel input, WorkflowStatus status) {
5454
this.id = id;
5555
this.input = input;
@@ -236,13 +236,6 @@ public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
236236
return CompletableFuture.completedFuture(t);
237237
}
238238

239-
// internal purposes only, not to be invoked directly by users of the API
240-
public void restore(
241-
WorkflowPosition position, WorkflowModel model, WorkflowModel context, Instant startedAt) {
242-
this.startedAt = startedAt;
243-
workflowContext.context(context);
244-
}
245-
246239
@Override
247240
public boolean cancel() {
248241
try {

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,13 @@ public abstract static class AbstractTaskExecutorBuilder<
7878
protected final WorkflowApplication application;
7979
protected final Workflow workflow;
8080
protected final ResourceLoader resourceLoader;
81+
private final WorkflowDefinition definition;
8182

8283
private V instance;
8384

8485
protected AbstractTaskExecutorBuilder(
8586
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
87+
this.definition = definition;
8688
this.workflow = definition.workflow();
8789
this.taskName = position.last().toString();
8890
this.position = position;
@@ -147,6 +149,7 @@ public V build() {
147149
if (instance == null) {
148150
instance = buildInstance();
149151
buildTransition(instance);
152+
definition.addTaskExecutor(position, instance);
150153
}
151154
return instance;
152155
}
@@ -256,6 +259,10 @@ private void handleException(
256259
}
257260
}
258261

262+
public WorkflowPosition position() {
263+
return position;
264+
}
265+
259266
protected abstract TransitionInfo getSkipTransition();
260267

261268
protected abstract CompletableFuture<TaskContext> execute(

impl/persistence/api/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<dependency>
1212
<groupId>io.serverlessworkflow</groupId>
1313
<artifactId>serverlessworkflow-impl-core</artifactId>
14-
</dependency>
14+
<version>8.0.0-SNAPSHOT</version>
15+
</dependency>
1516
</dependencies>
1617
</project>
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.marshaller;
17+
18+
import java.time.Instant;
19+
import java.util.ArrayList;
20+
import java.util.Collection;
21+
import java.util.LinkedHashMap;
22+
import java.util.Map;
23+
24+
public abstract class AbstractInputBuffer implements WorkflowInputBuffer {
25+
26+
private final Collection<CustomObjectMarshaller> customMarshallers;
27+
28+
protected AbstractInputBuffer(Collection<CustomObjectMarshaller> customMarshallers) {
29+
this.customMarshallers = customMarshallers;
30+
}
31+
32+
@Override
33+
public <T extends Enum<T>> T readEnum(Class<T> enumClass) {
34+
return Enum.valueOf(enumClass, readString());
35+
}
36+
37+
@Override
38+
public Instant readInstant() {
39+
return Instant.ofEpochMilli(readLong());
40+
}
41+
42+
@Override
43+
public Map<String, Object> readMap() {
44+
int size = readInt();
45+
Map<String, Object> map = new LinkedHashMap<String, Object>(size);
46+
while (size-- > 0) {
47+
map.put(readString(), readObject());
48+
}
49+
return map;
50+
}
51+
52+
@Override
53+
public Collection<Object> readCollection() {
54+
int size = readInt();
55+
Collection<Object> col = new ArrayList<>(size);
56+
while (size-- > 0) {
57+
col.add(readObject());
58+
}
59+
return col;
60+
}
61+
62+
protected Type readType() {
63+
return Type.values()[readByte()];
64+
}
65+
66+
@Override
67+
public Object readObject() {
68+
69+
Type type = readType();
70+
71+
switch (type) {
72+
case NULL:
73+
return null;
74+
75+
case SHORT:
76+
return readShort();
77+
78+
case LONG:
79+
return readLong();
80+
81+
case INT:
82+
return readInt();
83+
84+
case BYTE:
85+
return readByte();
86+
87+
case BYTES:
88+
return readBytes();
89+
90+
case FLOAT:
91+
return readFloat();
92+
93+
case DOUBLE:
94+
return readDouble();
95+
96+
case BOOLEAN:
97+
return readBoolean();
98+
99+
case STRING:
100+
return readString();
101+
102+
case MAP:
103+
return readMap();
104+
105+
case COLLECTION:
106+
return readCollection();
107+
108+
case INSTANT:
109+
return readInstant();
110+
111+
default:
112+
return readCustomObject();
113+
}
114+
}
115+
116+
protected Class<?> readClass() {
117+
String className = readString();
118+
try {
119+
return Class.forName(className);
120+
} catch (ClassNotFoundException ex) {
121+
throw new IllegalStateException(ex);
122+
}
123+
}
124+
125+
protected Class<?> loadClass(String className) throws ClassNotFoundException {
126+
return Class.forName(className);
127+
}
128+
129+
protected Object readCustomObject() {
130+
Class<?> objectClass = readClass();
131+
return customMarshallers.stream()
132+
.filter(m -> m.getObjectClass().isAssignableFrom(objectClass))
133+
.findFirst()
134+
.map(m -> m.read(this))
135+
.orElseThrow(() -> new IllegalArgumentException("Unsupported type " + objectClass));
136+
}
137+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.marshaller;
17+
18+
import java.time.Instant;
19+
import java.util.Collection;
20+
import java.util.Map;
21+
22+
public abstract class AbstractOutputBuffer implements WorkflowOutputBuffer {
23+
24+
private final Collection<CustomObjectMarshaller> customMarshallers;
25+
26+
protected AbstractOutputBuffer(Collection<CustomObjectMarshaller> customMarshallers) {
27+
this.customMarshallers = customMarshallers;
28+
}
29+
30+
@Override
31+
public WorkflowOutputBuffer writeInstant(Instant instant) {
32+
writeLong(instant.getEpochSecond());
33+
return this;
34+
}
35+
36+
@Override
37+
public <T extends Enum<T>> WorkflowOutputBuffer writeEnum(T value) {
38+
writeString(value.name());
39+
return this;
40+
}
41+
42+
@Override
43+
public WorkflowOutputBuffer writeMap(Map<String, Object> map) {
44+
writeInt(map.size());
45+
map.forEach(
46+
(k, v) -> {
47+
writeString(k);
48+
writeObject(v);
49+
});
50+
51+
return this;
52+
}
53+
54+
@Override
55+
public WorkflowOutputBuffer writeCollection(Collection<Object> col) {
56+
writeInt(col.size());
57+
col.forEach(this::writeObject);
58+
return this;
59+
}
60+
61+
@Override
62+
public WorkflowOutputBuffer writeObject(Object object) {
63+
if (object == null) {
64+
writeType(Type.NULL);
65+
} else if (object instanceof Short number) {
66+
writeType(Type.SHORT);
67+
writeShort(number);
68+
} else if (object instanceof Integer number) {
69+
writeType(Type.INT);
70+
writeInt(number);
71+
} else if (object instanceof Long number) {
72+
writeType(Type.LONG);
73+
writeLong(number);
74+
} else if (object instanceof Byte number) {
75+
writeType(Type.BYTE);
76+
writeLong(number);
77+
} else if (object instanceof Float number) {
78+
writeType(Type.FLOAT);
79+
writeFloat(number);
80+
} else if (object instanceof Double number) {
81+
writeType(Type.DOUBLE);
82+
writeDouble(number);
83+
} else if (object instanceof Boolean bool) {
84+
writeType(Type.BOOLEAN);
85+
writeBoolean(bool);
86+
} else if (object instanceof String str) {
87+
writeType(Type.STRING);
88+
writeString(str);
89+
} else if (object instanceof Map value) {
90+
writeType(Type.MAP);
91+
writeMap(value);
92+
} else if (object instanceof Collection value) {
93+
writeType(Type.COLLECTION);
94+
writeCollection(value);
95+
} else if (object instanceof Instant value) {
96+
writeType(Type.INSTANT);
97+
writeInstant(value);
98+
} else if (object instanceof byte[] bytes) {
99+
writeType(Type.BYTES);
100+
writeBytes(bytes);
101+
} else {
102+
writeCustomObject(object);
103+
}
104+
return this;
105+
}
106+
107+
protected void writeClass(Class<?> objectClass) {
108+
writeString(objectClass.getCanonicalName());
109+
}
110+
111+
protected void writeCustomObject(Object object) {
112+
customMarshallers.stream()
113+
.filter(m -> m.getObjectClass().isAssignableFrom(object.getClass()))
114+
.findFirst()
115+
.ifPresentOrElse(
116+
m -> {
117+
writeClass(m.getObjectClass());
118+
m.write(this, m.getObjectClass().cast(object));
119+
},
120+
() -> new IllegalArgumentException("Unsupported type " + object.getClass()));
121+
}
122+
123+
protected void writeType(Type type) {
124+
writeByte((byte) type.ordinal());
125+
}
126+
}

0 commit comments

Comments
 (0)