Skip to content

Commit 7766815

Browse files
committed
[Fix #782] Fixes after manual testing
Signed-off-by: fjtirado <[email protected]>
1 parent 94c645b commit 7766815

File tree

34 files changed

+337
-146
lines changed

34 files changed

+337
-146
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
19+
1820
import io.serverlessworkflow.api.types.SchemaInline;
1921
import io.serverlessworkflow.api.types.Workflow;
2022
import io.serverlessworkflow.impl.events.EventConsumer;
@@ -39,13 +41,9 @@
3941
import java.util.concurrent.ConcurrentHashMap;
4042
import java.util.concurrent.ExecutorService;
4143
import java.util.stream.Collectors;
42-
import org.slf4j.Logger;
43-
import org.slf4j.LoggerFactory;
4444

4545
public class WorkflowApplication implements AutoCloseable {
4646

47-
private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);
48-
4947
private final TaskExecutorFactory taskFactory;
5048
private final ExpressionFactory exprFactory;
5149
private final ResourceLoaderFactory resourceLoaderFactory;
@@ -271,14 +269,11 @@ public void close() {
271269
safeClose(definition);
272270
}
273271
definitions.clear();
274-
}
275272

276-
private void safeClose(AutoCloseable closeable) {
277-
try {
278-
closeable.close();
279-
} catch (Exception ex) {
280-
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
273+
for (WorkflowExecutionListener listener : listeners) {
274+
listener.close();
281275
}
276+
listeners.clear();
282277
}
283278

284279
public WorkflowPositionFactory positionFactory() {

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class WorkflowMutableInstance implements WorkflowInstance {
5050
private Lock statusLock = new ReentrantLock();
5151
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5252

53-
public WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) {
53+
protected WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) {
5454
this.id = id;
5555
this.input = input;
5656
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
@@ -265,5 +265,5 @@ public boolean cancel() {
265265
}
266266
}
267267

268-
public void restoreContext(WorkflowDefinition definition, TaskContext context) {}
268+
public void restoreContext(WorkflowContext workflow, TaskContext context) {}
269269
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,15 @@
2828
import java.net.URI;
2929
import java.util.Map;
3030
import java.util.Optional;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
public class WorkflowUtils {
3335

3436
private WorkflowUtils() {}
3537

38+
private static final Logger logger = LoggerFactory.getLogger(WorkflowUtils.class);
39+
3640
public static Optional<SchemaValidator> getSchemaValidator(
3741
SchemaValidatorFactory validatorFactory, ResourceLoader resourceLoader, SchemaUnion schema) {
3842
if (schema != null) {
@@ -138,4 +142,12 @@ public static String toString(UriTemplate template) {
138142
URI uri = template.getLiteralUri();
139143
return uri != null ? uri.toString() : template.getLiteralUriTemplate();
140144
}
145+
146+
public static void safeClose(AutoCloseable closeable) {
147+
try {
148+
closeable.close();
149+
} catch (Exception ex) {
150+
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
151+
}
152+
}
141153
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ private CompletableFuture<TaskContext> executeNext(
192192
public CompletableFuture<TaskContext> apply(
193193
WorkflowContext workflowContext, Optional<TaskContext> parentContext, WorkflowModel input) {
194194
TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task);
195-
workflowContext.instance().restoreContext(workflowContext.definition(), taskContext);
195+
workflowContext.instance().restoreContext(workflowContext, taskContext);
196196
CompletableFuture<TaskContext> completable = CompletableFuture.completedFuture(taskContext);
197197
if (taskContext.isCompleted() && !TaskExecutorHelper.isActive(workflowContext)) {
198198
return completable;

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.lifecycle;
1717

18-
public interface WorkflowExecutionListener {
18+
public interface WorkflowExecutionListener extends AutoCloseable {
1919

2020
default void onWorkflowStarted(WorkflowStartedEvent ev) {}
2121

@@ -42,4 +42,7 @@ default void onTaskSuspended(TaskSuspendedEvent ev) {}
4242
default void onTaskResumed(TaskResumedEvent ev) {}
4343

4444
default void onTaskRetried(TaskRetriedEvent ev) {}
45+
46+
@Override
47+
default void close() {}
4548
}

impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl.expressions.jq;
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
1920
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
2021
import com.fasterxml.jackson.databind.node.ArrayNode;
2122
import com.fasterxml.jackson.databind.node.BooleanNode;
@@ -29,6 +30,7 @@
2930
import java.util.Optional;
3031

3132
@JsonSerialize(using = JacksonModelSerializer.class)
33+
@JsonDeserialize(using = JacksonModelDeserializer.class)
3234
public class JacksonModel implements WorkflowModel {
3335

3436
protected JsonNode node;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.expressions.jq;
17+
18+
import com.fasterxml.jackson.core.JacksonException;
19+
import com.fasterxml.jackson.core.JsonParser;
20+
import com.fasterxml.jackson.databind.DeserializationContext;
21+
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
22+
import java.io.IOException;
23+
24+
public class JacksonModelDeserializer extends StdDeserializer<JacksonModel> {
25+
26+
private static final long serialVersionUID = 1L;
27+
28+
protected JacksonModelDeserializer() {
29+
super(JacksonModel.class);
30+
}
31+
32+
@Override
33+
public JacksonModel deserialize(JsonParser p, DeserializationContext ctxt)
34+
throws IOException, JacksonException {
35+
return new JacksonModel(p.readValueAsTree());
36+
}
37+
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,11 @@ public Object readObject() {
108108
case INSTANT:
109109
return readInstant();
110110

111-
default:
111+
case CUSTOM:
112112
return readCustomObject();
113+
114+
default:
115+
throw new IllegalStateException("Unsupported type " + type);
113116
}
114117
}
115118

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public WorkflowOutputBuffer writeObject(Object object) {
9999
writeType(Type.BYTES);
100100
writeBytes(bytes);
101101
} else {
102+
writeType(Type.CUSTOM);
102103
writeCustomObject(object);
103104
}
104105
return this;
@@ -109,15 +110,14 @@ protected void writeClass(Class<?> objectClass) {
109110
}
110111

111112
protected void writeCustomObject(Object object) {
112-
customMarshallers.stream()
113-
.filter(m -> m.getObjectClass().isAssignableFrom(object.getClass()))
114-
.findFirst()
115-
.ifPresentOrElse(
116-
m -> {
117-
writeClass(m.getObjectClass());
118-
m.write(this, m.getObjectClass().cast(object));
119-
},
120-
() -> new IllegalArgumentException("Unsupported type " + object.getClass()));
113+
CustomObjectMarshaller marshaller =
114+
customMarshallers.stream()
115+
.filter(m -> m.getObjectClass().isAssignableFrom(object.getClass()))
116+
.findFirst()
117+
.orElseThrow(
118+
() -> new IllegalArgumentException("Unsupported type " + object.getClass()));
119+
writeClass(marshaller.getObjectClass());
120+
marshaller.write(this, marshaller.getObjectClass().cast(object));
121121
}
122122

123123
protected void writeType(Type type) {

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultBufferFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,19 @@
1818
import java.io.InputStream;
1919
import java.io.OutputStream;
2020
import java.util.Collection;
21+
import java.util.ServiceLoader;
2122

2223
public class DefaultBufferFactory implements WorkflowBufferFactory {
2324

2425
private final Collection<CustomObjectMarshaller> marshallers;
2526

27+
public static DefaultBufferFactory factory() {
28+
return new DefaultBufferFactory(
29+
ServiceLoader.load(CustomObjectMarshaller.class).stream()
30+
.map(ServiceLoader.Provider::get)
31+
.toList());
32+
}
33+
2634
public DefaultBufferFactory(Collection<CustomObjectMarshaller> marshallers) {
2735
this.marshallers = marshallers;
2836
}

0 commit comments

Comments
 (0)