Skip to content

Commit 1039702

Browse files
committed
[Fix #782] Refining approach
1 parent cc0cb1a commit 1039702

22 files changed

+508
-254
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4949
private CompletableFuture<WorkflowModel> completableFuture;
5050
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5151

52-
public WorkflowMutableInstance(
52+
protected WorkflowMutableInstance(
5353
WorkflowDefinition definition, String id, WorkflowModel input, WorkflowStatus status) {
5454
this.id = id;
5555
this.input = input;

impl/persistence/api/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<dependency>
1212
<groupId>io.serverlessworkflow</groupId>
1313
<artifactId>serverlessworkflow-impl-core</artifactId>
14-
</dependency>
14+
<version>8.0.0-SNAPSHOT</version>
15+
</dependency>
1516
</dependencies>
1617
</project>
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.marshaller;
17+
18+
import java.time.Instant;
19+
import java.util.Collection;
20+
import java.util.Map;
21+
22+
public abstract class AbstractOutputBuffer implements WorkflowOutputBuffer {
23+
24+
private final Collection<CustomObjectMarshaller> customMarshallers;
25+
26+
protected AbstractOutputBuffer(Collection<CustomObjectMarshaller> customMarshallers) {
27+
this.customMarshallers = customMarshallers;
28+
}
29+
30+
@Override
31+
public WorkflowOutputBuffer writeInstant(Instant instant) {
32+
writeLong(instant.getEpochSecond());
33+
return this;
34+
}
35+
36+
@Override
37+
public <T extends Enum<T>> WorkflowOutputBuffer writeEnum(T value) {
38+
writeString(value.name());
39+
return this;
40+
}
41+
42+
@Override
43+
public WorkflowOutputBuffer writeMap(Map<String, Object> map) {
44+
writeInt(map.size());
45+
map.forEach(
46+
(k, v) -> {
47+
writeString(k);
48+
writeObject(v);
49+
});
50+
51+
return this;
52+
}
53+
54+
@Override
55+
public WorkflowOutputBuffer writeCollection(Collection<Object> col) {
56+
writeInt(col.size());
57+
col.forEach(this::writeObject);
58+
return this;
59+
}
60+
61+
private enum TYPE {
62+
BYTE,
63+
BYTES,
64+
INT,
65+
SHORT,
66+
LONG,
67+
FLOAT,
68+
DOUBLE,
69+
BOOLEAN,
70+
STRING,
71+
INSTANT,
72+
MAP,
73+
COLLECTION,
74+
NULL
75+
}
76+
77+
@Override
78+
public WorkflowOutputBuffer writeObject(Object object) {
79+
if (object == null) {
80+
writeType(TYPE.NULL);
81+
} else if (object instanceof Short number) {
82+
writeType(TYPE.SHORT);
83+
writeShort(number);
84+
} else if (object instanceof Integer number) {
85+
writeType(TYPE.INT);
86+
writeInt(number);
87+
} else if (object instanceof Long number) {
88+
writeType(TYPE.LONG);
89+
writeLong(number);
90+
} else if (object instanceof Byte number) {
91+
writeType(TYPE.BYTE);
92+
writeLong(number);
93+
} else if (object instanceof Float number) {
94+
writeType(TYPE.FLOAT);
95+
writeFloat(number);
96+
} else if (object instanceof Double number) {
97+
writeType(TYPE.DOUBLE);
98+
writeDouble(number);
99+
} else if (object instanceof Boolean bool) {
100+
writeType(TYPE.BOOLEAN);
101+
writeBoolean(bool);
102+
} else if (object instanceof String str) {
103+
writeType(TYPE.STRING);
104+
writeString(str);
105+
} else if (object instanceof Map value) {
106+
writeType(TYPE.MAP);
107+
writeMap(value);
108+
} else if (object instanceof Collection value) {
109+
writeType(TYPE.COLLECTION);
110+
writeCollection(value);
111+
} else if (object instanceof Instant value) {
112+
writeType(TYPE.INSTANT);
113+
writeInstant(value);
114+
} else if (object instanceof byte[] bytes) {
115+
writeType(TYPE.BYTES);
116+
writeBytes(bytes);
117+
} else {
118+
writeCustomObject(object);
119+
}
120+
return this;
121+
}
122+
123+
protected void writeClass(Class<?> objectClass) {
124+
writeString(objectClass.getCanonicalName());
125+
}
126+
127+
protected void writeCustomObject(Object object) {
128+
customMarshallers.stream()
129+
.filter(m -> m.getObjectClass().isAssignableFrom(object.getClass()))
130+
.findFirst()
131+
.ifPresentOrElse(
132+
m -> {
133+
writeClass(m.getObjectClass());
134+
m.write(this, m.getObjectClass().cast(object));
135+
},
136+
() -> new IllegalArgumentException("Unsupported type " + object.getClass()));
137+
}
138+
139+
private void writeType(TYPE type) {
140+
writeByte((byte) type.ordinal());
141+
}
142+
}
Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,12 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.persistence;
16+
package io.serverlessworkflow.impl.marshaller;
1717

18-
import io.serverlessworkflow.impl.WorkflowDefinition;
19-
import io.serverlessworkflow.impl.WorkflowInstance;
20-
import java.util.stream.Stream;
18+
public interface CustomObjectMarshaller<T> {
19+
void write(WorkflowOutputBuffer buffer, T object);
2120

22-
public interface WorkflowMinimumPersistenceReader extends AutoCloseable {
21+
T read(WorkflowInputBuffer buffer);
2322

24-
/**
25-
* Allow streaming over all stored workflow instances for a certain definition
26-
*
27-
* @return
28-
*/
29-
Stream<WorkflowInstance> all(WorkflowDefinition definition);
23+
Class<T> getObjectClass();
3024
}
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,27 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.persistence.bigmap;
16+
package io.serverlessworkflow.impl.marshaller;
1717

1818
import java.io.InputStream;
1919
import java.io.OutputStream;
20+
import java.util.Collection;
2021

2122
public class DefaultBufferFactory implements WorkflowBufferFactory {
2223

24+
private final Collection<CustomObjectMarshaller> marshallers;
25+
26+
public DefaultBufferFactory(Collection<CustomObjectMarshaller> marshallers) {
27+
this.marshallers = marshallers;
28+
}
29+
2330
@Override
2431
public WorkflowInputBuffer input(InputStream input) {
25-
return new DefaultInputBuffer(input);
32+
return new DefaultInputBuffer(input, marshallers);
2633
}
2734

2835
@Override
2936
public WorkflowOutputBuffer output(OutputStream output) {
30-
return new DefaultOutputBuffer(output);
37+
return new DefaultOutputBuffer(output, marshallers);
3138
}
3239
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,20 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.persistence.bigmap;
16+
package io.serverlessworkflow.impl.marshaller;
1717

1818
import java.io.DataInputStream;
1919
import java.io.IOException;
2020
import java.io.InputStream;
2121
import java.io.UncheckedIOException;
2222
import java.time.Instant;
23+
import java.util.Collection;
2324

2425
public class DefaultInputBuffer implements WorkflowInputBuffer {
2526

2627
private DataInputStream input;
2728

28-
public DefaultInputBuffer(InputStream in) {
29+
public DefaultInputBuffer(InputStream in, Collection<CustomObjectMarshaller> marshallers) {
2930
input = new DataInputStream(in);
3031
}
3132

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,21 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.persistence.bigmap;
16+
package io.serverlessworkflow.impl.marshaller;
1717

1818
import java.io.DataOutputStream;
1919
import java.io.IOException;
2020
import java.io.OutputStream;
2121
import java.io.UncheckedIOException;
22-
import java.time.Instant;
22+
import java.util.Collection;
2323

24-
public class DefaultOutputBuffer implements WorkflowOutputBuffer {
24+
public class DefaultOutputBuffer extends AbstractOutputBuffer {
2525

2626
private DataOutputStream output;
2727

28-
public DefaultOutputBuffer(OutputStream out) {
28+
public DefaultOutputBuffer(
29+
OutputStream out, Collection<CustomObjectMarshaller> customMarshallers) {
30+
super(customMarshallers);
2931
output = new DataOutputStream(out);
3032
}
3133

@@ -103,7 +105,6 @@ public WorkflowOutputBuffer writeBoolean(boolean bool) {
103105
public WorkflowOutputBuffer writeByte(byte one) {
104106
try {
105107
output.writeByte(one);
106-
;
107108
} catch (IOException e) {
108109
throw new UncheckedIOException(e);
109110
}
@@ -121,18 +122,6 @@ public WorkflowOutputBuffer writeBytes(byte[] bytes) {
121122
return this;
122123
}
123124

124-
@Override
125-
public WorkflowOutputBuffer writeInstant(Instant instant) {
126-
writeLong(instant.getEpochSecond());
127-
return this;
128-
}
129-
130-
@Override
131-
public <T extends Enum<T>> WorkflowOutputBuffer writeEnum(T value) {
132-
writeString(value.name());
133-
return this;
134-
}
135-
136125
@Override
137126
public void close() {
138127
try {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.persistence.bigmap;
16+
package io.serverlessworkflow.impl.marshaller;
1717

1818
import java.io.InputStream;
1919
import java.io.OutputStream;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.persistence.bigmap;
16+
package io.serverlessworkflow.impl.marshaller;
1717

1818
import java.io.Closeable;
1919
import java.time.Instant;
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.persistence.bigmap;
16+
package io.serverlessworkflow.impl.marshaller;
1717

18-
import java.io.Closeable;
1918
import java.time.Instant;
19+
import java.util.Collection;
20+
import java.util.Map;
2021

21-
public interface WorkflowOutputBuffer extends Closeable {
22+
public interface WorkflowOutputBuffer extends AutoCloseable {
2223

2324
WorkflowOutputBuffer writeString(String text);
2425

@@ -40,6 +41,12 @@ public interface WorkflowOutputBuffer extends Closeable {
4041

4142
WorkflowOutputBuffer writeInstant(Instant instant);
4243

44+
WorkflowOutputBuffer writeMap(Map<String, Object> map);
45+
46+
WorkflowOutputBuffer writeCollection(Collection<Object> col);
47+
48+
WorkflowOutputBuffer writeObject(Object object);
49+
4350
<T extends Enum<T>> WorkflowOutputBuffer writeEnum(T value);
4451

4552
void close();

0 commit comments

Comments
 (0)