diff --git a/impl/README.md b/impl/README.md index d7649733..e041285d 100644 --- a/impl/README.md +++ b/impl/README.md @@ -213,6 +213,10 @@ As shown in previous examples, to start a new workflow instance, first a [Workfl Once started, and before it completes, a workflow instance execution can be suspended or cancelled. Once cancelled, a workflow instance is done, while a suspended one might be resumed. +## Persistence + +Workflow progress might be recorded into DB. See [details](persistence/README.md) + ## Fluent Java DSL Prefer building workflows programmatically with type-safe builders and recipes? diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 152a3f61..9634c5f7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -37,6 +37,7 @@ public class TaskContext implements TaskContextData { private WorkflowModel rawOutput; private Instant completedAt; private TransitionInfo transition; + private boolean completed; public TaskContext( WorkflowModel input, @@ -109,6 +110,7 @@ public WorkflowModel rawOutput() { public TaskContext output(WorkflowModel output) { this.output = output; + this.completed = true; return this; } @@ -159,6 +161,10 @@ public TaskContext transition(TransitionInfo transition) { return this; } + public boolean isCompleted() { + return completed; + } + @Override public String toString() { return "TaskContext [position=" diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 8e25e651..9e4cc008 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -15,6 +15,8 @@ */ package io.serverlessworkflow.impl; +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + import io.serverlessworkflow.api.types.SchemaInline; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.events.EventConsumer; @@ -39,13 +41,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class WorkflowApplication implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class); - private final TaskExecutorFactory taskFactory; private final ExpressionFactory exprFactory; private final ResourceLoaderFactory resourceLoaderFactory; @@ -271,14 +269,11 @@ public void close() { safeClose(definition); } definitions.clear(); - } - private void safeClose(AutoCloseable closeable) { - try { - closeable.close(); - } catch (Exception ex) { - logger.warn("Error closing resource {}", closeable.getClass().getName(), ex); + for (WorkflowExecutionListener listener : listeners) { + safeClose(listener); } + listeners.clear(); } public WorkflowPositionFactory positionFactory() { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index e0805c39..b79d86bb 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -25,6 +25,8 @@ import io.serverlessworkflow.impl.resources.ResourceLoader; import io.serverlessworkflow.impl.schema.SchemaValidator; import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData { @@ -37,6 +39,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData private final WorkflowApplication application; private final TaskExecutor taskExecutor; private final ResourceLoader resourceLoader; + private final Map> executors = new HashMap<>(); private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { @@ -70,7 +73,9 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, } public WorkflowInstance instance(Object input) { - return new WorkflowMutableInstance(this, application.modelFactory().fromAny(input)); + WorkflowModel inputModel = application.modelFactory().fromAny(input); + inputSchemaValidator().ifPresent(v -> v.validate(inputModel)); + return new WorkflowMutableInstance(this, application().idFactory().get(), inputModel); } Optional inputSchemaValidator() { @@ -107,8 +112,14 @@ public ResourceLoader resourceLoader() { return resourceLoader; } - @Override - public void close() { - // TODO close resourcers hold for uncompleted process instances, if any + public TaskExecutor taskExecutor(String jsonPointer) { + return executors.get(jsonPointer); + } + + public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor taskExecutor) { + executors.put(position.jsonPointer(), taskExecutor); } + + @Override + public void close() {} } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index c5065c36..7409a1db 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -38,32 +38,43 @@ public class WorkflowMutableInstance implements WorkflowInstance { protected final AtomicReference status; - private final String id; - private final WorkflowModel input; + protected final String id; + protected final WorkflowModel input; + + protected final WorkflowContext workflowContext; + protected Instant startedAt; + + protected AtomicReference> futureRef = new AtomicReference<>(); + protected Instant completedAt; - private WorkflowContext workflowContext; - private Instant startedAt; - private Instant completedAt; - private volatile WorkflowModel output; private Lock statusLock = new ReentrantLock(); - private CompletableFuture completableFuture; private Map, TaskContext> suspended; - WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) { - this.id = definition.application().idFactory().get(); + protected WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) { + this.id = id; this.input = input; this.status = new AtomicReference<>(WorkflowStatus.PENDING); - definition.inputSchemaValidator().ifPresent(v -> v.validate(input)); this.workflowContext = new WorkflowContext(definition, this); } @Override public CompletableFuture start() { - this.startedAt = Instant.now(); - this.status.set(WorkflowStatus.RUNNING); - publishEvent( - workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext))); - this.completableFuture = + return startExecution( + () -> { + startedAt = Instant.now(); + publishEvent( + workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext))); + }); + } + + protected final CompletableFuture startExecution(Runnable runnable) { + CompletableFuture future = futureRef.get(); + if (future != null) { + return future; + } + status.set(WorkflowStatus.RUNNING); + runnable.run(); + future = TaskExecutorHelper.processTaskList( workflowContext.definition().startTask(), workflowContext, @@ -75,7 +86,8 @@ public CompletableFuture start() { .orElse(input)) .whenComplete(this::whenFailed) .thenApply(this::whenSuccess); - return completableFuture; + futureRef.set(future); + return future; } private void whenFailed(WorkflowModel result, Throwable ex) { @@ -94,7 +106,7 @@ private void handleException(Throwable ex) { } private WorkflowModel whenSuccess(WorkflowModel node) { - output = + WorkflowModel output = workflowContext .definition() .outputFilter() @@ -103,7 +115,8 @@ private WorkflowModel whenSuccess(WorkflowModel node) { workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output)); status.set(WorkflowStatus.COMPLETED); publishEvent( - workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext))); + workflowContext, + l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output))); return output; } @@ -134,11 +147,13 @@ public WorkflowStatus status() { @Override public WorkflowModel output() { - return output; + CompletableFuture future = futureRef.get(); + return future != null ? future.join() : null; } @Override public T outputAs(Class clazz) { + WorkflowModel output = output(); return output != null ? output .as(clazz) @@ -171,8 +186,7 @@ public boolean suspend() { try { statusLock.lock(); if (TaskExecutorHelper.isActive(status.get()) && suspended == null) { - suspended = new ConcurrentHashMap<>(); - status.set(WorkflowStatus.SUSPENDED); + internalSuspend(); publishEvent( workflowContext, l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext))); @@ -185,6 +199,11 @@ public boolean suspend() { } } + protected final void internalSuspend() { + suspended = new ConcurrentHashMap<>(); + status.set(WorkflowStatus.SUSPENDED); + } + @Override public boolean resume() { try { @@ -253,4 +272,6 @@ public boolean cancel() { statusLock.unlock(); } } + + public void restoreContext(WorkflowContext workflow, TaskContext context) {} } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index db056676..491dc2aa 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -28,11 +28,15 @@ import java.net.URI; import java.util.Map; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WorkflowUtils { private WorkflowUtils() {} + private static final Logger logger = LoggerFactory.getLogger(WorkflowUtils.class); + public static Optional getSchemaValidator( SchemaValidatorFactory validatorFactory, ResourceLoader resourceLoader, SchemaUnion schema) { if (schema != null) { @@ -138,4 +142,14 @@ public static String toString(UriTemplate template) { URI uri = template.getLiteralUri(); return uri != null ? uri.toString() : template.getLiteralUriTemplate(); } + + public static void safeClose(AutoCloseable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (Exception ex) { + logger.warn("Error closing resource {}", closeable.getClass().getName(), ex); + } + } + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index ba3cae0d..5c80adc8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -78,11 +78,13 @@ public abstract static class AbstractTaskExecutorBuilder< protected final WorkflowApplication application; protected final Workflow workflow; protected final ResourceLoader resourceLoader; + private final WorkflowDefinition definition; private V instance; protected AbstractTaskExecutorBuilder( WorkflowMutablePosition position, T task, WorkflowDefinition definition) { + this.definition = definition; this.workflow = definition.workflow(); this.taskName = position.last().toString(); this.position = position; @@ -147,6 +149,7 @@ public V build() { if (instance == null) { instance = buildInstance(); buildTransition(instance); + definition.addTaskExecutor(position, instance); } return instance; } @@ -189,11 +192,13 @@ private CompletableFuture executeNext( public CompletableFuture apply( WorkflowContext workflowContext, Optional parentContext, WorkflowModel input) { TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task); + workflowContext.instance().restoreContext(workflowContext, taskContext); CompletableFuture completable = CompletableFuture.completedFuture(taskContext); if (!TaskExecutorHelper.isActive(workflowContext)) { return completable; - } - if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) { + } else if (taskContext.isCompleted()) { + return executeNext(completable, workflowContext); + } else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) { return executeNext( completable .thenCompose(workflowContext.instance()::suspendedCheck) @@ -256,6 +261,10 @@ private void handleException( } } + public WorkflowPosition position() { + return position; + } + protected abstract TransitionInfo getSkipTransition(); protected abstract CompletableFuture execute( diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java index 727a28e7..cae1a6d0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java @@ -16,10 +16,18 @@ package io.serverlessworkflow.impl.lifecycle; import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowModel; public class WorkflowCompletedEvent extends WorkflowEvent { - public WorkflowCompletedEvent(WorkflowContextData workflow) { + private WorkflowModel output; + + public WorkflowCompletedEvent(WorkflowContextData workflow, WorkflowModel output) { super(workflow); + this.output = output; + } + + public WorkflowModel output() { + return output; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java index 8d89fac7..e88e8cbd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java @@ -15,7 +15,7 @@ */ package io.serverlessworkflow.impl.lifecycle; -public interface WorkflowExecutionListener { +public interface WorkflowExecutionListener extends AutoCloseable { default void onWorkflowStarted(WorkflowStartedEvent ev) {} @@ -42,4 +42,7 @@ default void onTaskSuspended(TaskSuspendedEvent ev) {} default void onTaskResumed(TaskResumedEvent ev) {} default void onTaskRetried(TaskRetriedEvent ev) {} + + @Override + default void close() {} } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index ade29db3..374c6751 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -225,7 +225,8 @@ public void onWorkflowCompleted(WorkflowCompletedEvent event) { builder() .withData( cloudEventData( - new WorkflowCompletedCEData(id(ev), ref(ev), ev.eventDate(), output(ev)), + new WorkflowCompletedCEData( + id(ev), ref(ev), ev.eventDate(), from(event.output())), this::convert)) .withType(WORKFLOW_COMPLETED) .build()); @@ -328,10 +329,6 @@ private static String pos(TaskEvent ev) { return ev.taskContext().position().jsonPointer(); } - private static Object output(WorkflowEvent ev) { - return from(ev.workflowContext().instanceData().output()); - } - private static Object output(TaskEvent ev) { return from(ev.taskContext().output()); } diff --git a/impl/jackson/pom.xml b/impl/jackson/pom.xml index d0a1f90c..fb1d1499 100644 --- a/impl/jackson/pom.xml +++ b/impl/jackson/pom.xml @@ -6,7 +6,7 @@ 8.0.0-SNAPSHOT serverlessworkflow-impl-jackson - Serverless Workflow :: Impl :: HTTP + Serverless Workflow :: Impl :: Jackson io.serverlessworkflow diff --git a/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModel.java b/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModel.java index b2054abf..c3a61893 100644 --- a/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModel.java +++ b/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModel.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl.expressions.jq; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.BooleanNode; @@ -29,6 +30,7 @@ import java.util.Optional; @JsonSerialize(using = JacksonModelSerializer.class) +@JsonDeserialize(using = JacksonModelDeserializer.class) public class JacksonModel implements WorkflowModel { protected JsonNode node; diff --git a/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelDeserializer.java b/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelDeserializer.java new file mode 100644 index 00000000..73d10bc1 --- /dev/null +++ b/impl/jackson/src/main/java/io/serverlessworkflow/impl/expressions/jq/JacksonModelDeserializer.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.expressions.jq; + +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import java.io.IOException; + +public class JacksonModelDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + protected JacksonModelDeserializer() { + super(JacksonModel.class); + } + + @Override + public JacksonModel deserialize(JsonParser p, DeserializationContext ctxt) + throws IOException, JacksonException { + return new JacksonModel(p.readValueAsTree()); + } +} diff --git a/impl/persistence/README.md b/impl/persistence/README.md new file mode 100644 index 00000000..43b4ca84 --- /dev/null +++ b/impl/persistence/README.md @@ -0,0 +1,13 @@ +[![Gitpod ready-to-code](https://img.shields.io/badge/Gitpod-ready--to--code-blue?logo=gitpod)](https://gitpod.io/#https://github.com/serverlessworkflow/sdk-java) + +# Serverless Workflow Specification — Java SDK (Reference Implementation)- Persistence + +Workflow persistence aim is to be able to restore workflow instances execution in the event of a JVM stop. To do that, progress of every running instance is persisted into the underlying DB by using life cycle events. Later on, when a new JVM is instantiated, the application is expected to manually start those instances that are not longer being executed by any other JVM, using the information previously stored. + +Currently, persistence structure has been layout for key-value store dbs, plus one concrete implementation using [H2 MVStore](mvstore/README.md). Next step will do the same for relational dbs (table layout plus concrete implementation using Postgresql). + +Map of key values has been given precedence because, when persisting the status of a running workflow instance, the number of writes are usually large, while read only operations are only performed when the JVM starts up. This give a performance edge for this kind of db over relational ones. + +--- + +*Questions or ideas? PRs and issues welcome!* diff --git a/impl/persistence/api/pom.xml b/impl/persistence/api/pom.xml new file mode 100644 index 00000000..f67a12e6 --- /dev/null +++ b/impl/persistence/api/pom.xml @@ -0,0 +1,17 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-persistence + 8.0.0-SNAPSHOT + + serverlessworkflow-persistence-api + Serverless Workflow :: Impl :: Persistence:: API + + + io.serverlessworkflow + serverlessworkflow-impl-core + 8.0.0-SNAPSHOT + + + \ No newline at end of file diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java new file mode 100644 index 00000000..b7971717 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java @@ -0,0 +1,140 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; + +public abstract class AbstractInputBuffer implements WorkflowInputBuffer { + + private final Collection customMarshallers; + + protected AbstractInputBuffer(Collection customMarshallers) { + this.customMarshallers = customMarshallers; + } + + @Override + public > T readEnum(Class enumClass) { + return Enum.valueOf(enumClass, readString()); + } + + @Override + public Instant readInstant() { + return Instant.ofEpochMilli(readLong()); + } + + @Override + public Map readMap() { + int size = readInt(); + Map map = new LinkedHashMap(size); + while (size-- > 0) { + map.put(readString(), readObject()); + } + return map; + } + + @Override + public Collection readCollection() { + int size = readInt(); + Collection col = new ArrayList<>(size); + while (size-- > 0) { + col.add(readObject()); + } + return col; + } + + protected Type readType() { + return Type.values()[readByte()]; + } + + @Override + public Object readObject() { + + Type type = readType(); + + switch (type) { + case NULL: + return null; + + case SHORT: + return readShort(); + + case LONG: + return readLong(); + + case INT: + return readInt(); + + case BYTE: + return readByte(); + + case BYTES: + return readBytes(); + + case FLOAT: + return readFloat(); + + case DOUBLE: + return readDouble(); + + case BOOLEAN: + return readBoolean(); + + case STRING: + return readString(); + + case MAP: + return readMap(); + + case COLLECTION: + return readCollection(); + + case INSTANT: + return readInstant(); + + case CUSTOM: + return readCustomObject(); + + default: + throw new IllegalStateException("Unsupported type " + type); + } + } + + protected Class readClass() { + String className = readString(); + try { + return Class.forName(className); + } catch (ClassNotFoundException ex) { + throw new IllegalStateException(ex); + } + } + + protected Class loadClass(String className) throws ClassNotFoundException { + return Class.forName(className); + } + + protected Object readCustomObject() { + Class objectClass = readClass(); + return customMarshallers.stream() + .filter(m -> m.getObjectClass().isAssignableFrom(objectClass)) + .findFirst() + .map(m -> m.read(this)) + .orElseThrow(() -> new IllegalArgumentException("Unsupported type " + objectClass)); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java new file mode 100644 index 00000000..fb002800 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java @@ -0,0 +1,126 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import java.time.Instant; +import java.util.Collection; +import java.util.Map; + +public abstract class AbstractOutputBuffer implements WorkflowOutputBuffer { + + private final Collection customMarshallers; + + protected AbstractOutputBuffer(Collection customMarshallers) { + this.customMarshallers = customMarshallers; + } + + @Override + public WorkflowOutputBuffer writeInstant(Instant instant) { + writeLong(instant.getEpochSecond()); + return this; + } + + @Override + public > WorkflowOutputBuffer writeEnum(T value) { + writeString(value.name()); + return this; + } + + @Override + public WorkflowOutputBuffer writeMap(Map map) { + writeInt(map.size()); + map.forEach( + (k, v) -> { + writeString(k); + writeObject(v); + }); + + return this; + } + + @Override + public WorkflowOutputBuffer writeCollection(Collection col) { + writeInt(col.size()); + col.forEach(this::writeObject); + return this; + } + + @Override + public WorkflowOutputBuffer writeObject(Object object) { + if (object == null) { + writeType(Type.NULL); + } else if (object instanceof Short number) { + writeType(Type.SHORT); + writeShort(number); + } else if (object instanceof Integer number) { + writeType(Type.INT); + writeInt(number); + } else if (object instanceof Long number) { + writeType(Type.LONG); + writeLong(number); + } else if (object instanceof Byte number) { + writeType(Type.BYTE); + writeLong(number); + } else if (object instanceof Float number) { + writeType(Type.FLOAT); + writeFloat(number); + } else if (object instanceof Double number) { + writeType(Type.DOUBLE); + writeDouble(number); + } else if (object instanceof Boolean bool) { + writeType(Type.BOOLEAN); + writeBoolean(bool); + } else if (object instanceof String str) { + writeType(Type.STRING); + writeString(str); + } else if (object instanceof Map value) { + writeType(Type.MAP); + writeMap(value); + } else if (object instanceof Collection value) { + writeType(Type.COLLECTION); + writeCollection(value); + } else if (object instanceof Instant value) { + writeType(Type.INSTANT); + writeInstant(value); + } else if (object instanceof byte[] bytes) { + writeType(Type.BYTES); + writeBytes(bytes); + } else { + writeType(Type.CUSTOM); + writeCustomObject(object); + } + return this; + } + + protected void writeClass(Class objectClass) { + writeString(objectClass.getCanonicalName()); + } + + protected void writeCustomObject(Object object) { + CustomObjectMarshaller marshaller = + customMarshallers.stream() + .filter(m -> m.getObjectClass().isAssignableFrom(object.getClass())) + .findFirst() + .orElseThrow( + () -> new IllegalArgumentException("Unsupported type " + object.getClass())); + writeClass(marshaller.getObjectClass()); + marshaller.write(this, marshaller.getObjectClass().cast(object)); + } + + protected void writeType(Type type) { + writeByte((byte) type.ordinal()); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/CustomObjectMarshaller.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/CustomObjectMarshaller.java new file mode 100644 index 00000000..b96283b6 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/CustomObjectMarshaller.java @@ -0,0 +1,24 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +public interface CustomObjectMarshaller { + void write(WorkflowOutputBuffer buffer, T object); + + T read(WorkflowInputBuffer buffer); + + Class getObjectClass(); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultBufferFactory.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultBufferFactory.java new file mode 100644 index 00000000..ba3d2cc8 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultBufferFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.ServiceLoader; + +public class DefaultBufferFactory implements WorkflowBufferFactory { + + private final Collection marshallers; + + private static class DefaultBufferFactoryHolder { + private static DefaultBufferFactory instance = + new DefaultBufferFactory( + ServiceLoader.load(CustomObjectMarshaller.class).stream() + .map(ServiceLoader.Provider::get) + .toList()); + } + + public static DefaultBufferFactory factory() { + return DefaultBufferFactoryHolder.instance; + } + + protected DefaultBufferFactory(Collection marshallers) { + this.marshallers = marshallers; + } + + @Override + public WorkflowInputBuffer input(InputStream input) { + return new DefaultInputBuffer(input, marshallers); + } + + @Override + public WorkflowOutputBuffer output(OutputStream output) { + return new DefaultOutputBuffer(output, marshallers); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultInputBuffer.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultInputBuffer.java new file mode 100644 index 00000000..ac89fbe0 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultInputBuffer.java @@ -0,0 +1,122 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Collection; + +public class DefaultInputBuffer extends AbstractInputBuffer { + + private DataInputStream input; + + public DefaultInputBuffer(InputStream in, Collection marshallers) { + super(marshallers); + input = new DataInputStream(in); + } + + @Override + public String readString() { + try { + return input.readUTF(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int readInt() { + try { + return input.readInt(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public short readShort() { + try { + return input.readShort(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public long readLong() { + try { + return input.readLong(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public float readFloat() { + try { + return input.readFloat(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public double readDouble() { + try { + return input.readFloat(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public boolean readBoolean() { + try { + return input.readBoolean(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public byte readByte() { + try { + return input.readByte(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public byte[] readBytes() { + try { + return input.readNBytes(readInt()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void close() { + try { + input.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultOutputBuffer.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultOutputBuffer.java new file mode 100644 index 00000000..1d519279 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/DefaultOutputBuffer.java @@ -0,0 +1,133 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.Collection; + +public class DefaultOutputBuffer extends AbstractOutputBuffer { + + private DataOutputStream output; + + public DefaultOutputBuffer( + OutputStream out, Collection customMarshallers) { + super(customMarshallers); + output = new DataOutputStream(out); + } + + @Override + public WorkflowOutputBuffer writeString(String text) { + try { + output.writeUTF(text); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public WorkflowOutputBuffer writeInt(int number) { + try { + output.writeInt(number); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public WorkflowOutputBuffer writeShort(short number) { + try { + output.writeShort(number); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public WorkflowOutputBuffer writeLong(long number) { + try { + output.writeLong(number); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public WorkflowOutputBuffer writeFloat(float number) { + try { + output.writeFloat(number); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public WorkflowOutputBuffer writeDouble(double number) { + try { + output.writeDouble(number); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public WorkflowOutputBuffer writeBoolean(boolean bool) { + try { + output.writeBoolean(bool); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public WorkflowOutputBuffer writeByte(byte one) { + try { + output.writeByte(one); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public WorkflowOutputBuffer writeBytes(byte[] bytes) { + try { + writeInt(bytes.length); + output.write(bytes); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return this; + } + + @Override + public void close() { + try { + output.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java new file mode 100644 index 00000000..cf52b7ab --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +enum Type { + BYTE, + BYTES, + INT, + SHORT, + LONG, + FLOAT, + DOUBLE, + BOOLEAN, + STRING, + INSTANT, + MAP, + COLLECTION, + NULL, + CUSTOM +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowBufferFactory.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowBufferFactory.java new file mode 100644 index 00000000..78b27c2b --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowBufferFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import java.io.InputStream; +import java.io.OutputStream; + +public interface WorkflowBufferFactory { + + WorkflowInputBuffer input(InputStream input); + + WorkflowOutputBuffer output(OutputStream output); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java new file mode 100644 index 00000000..f45567df --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import java.io.Closeable; +import java.time.Instant; +import java.util.Collection; +import java.util.Map; + +public interface WorkflowInputBuffer extends Closeable { + + String readString(); + + int readInt(); + + short readShort(); + + long readLong(); + + float readFloat(); + + double readDouble(); + + boolean readBoolean(); + + byte readByte(); + + byte[] readBytes(); + + > T readEnum(Class enumClass); + + Instant readInstant(); + + Map readMap(); + + Collection readCollection(); + + Object readObject(); + + void close(); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java new file mode 100644 index 00000000..f7ec25cf --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller; + +import java.time.Instant; +import java.util.Collection; +import java.util.Map; + +public interface WorkflowOutputBuffer extends AutoCloseable { + + WorkflowOutputBuffer writeString(String text); + + WorkflowOutputBuffer writeInt(int number); + + WorkflowOutputBuffer writeShort(short number); + + WorkflowOutputBuffer writeLong(long number); + + WorkflowOutputBuffer writeFloat(float number); + + WorkflowOutputBuffer writeDouble(double number); + + WorkflowOutputBuffer writeBoolean(boolean bool); + + WorkflowOutputBuffer writeByte(byte one); + + WorkflowOutputBuffer writeBytes(byte[] bytes); + + WorkflowOutputBuffer writeInstant(Instant instant); + + WorkflowOutputBuffer writeMap(Map map); + + WorkflowOutputBuffer writeCollection(Collection col); + + WorkflowOutputBuffer writeObject(Object object); + + > WorkflowOutputBuffer writeEnum(T value); + + void close(); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceApplicationBuilder.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceApplicationBuilder.java new file mode 100644 index 00000000..462cf6ca --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceApplicationBuilder.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowApplication.Builder; + +public class PersistenceApplicationBuilder { + + public static PersistenceApplicationBuilder builder( + WorkflowApplication.Builder builder, PersistenceInstanceWriter writer) { + return new PersistenceApplicationBuilder(builder, writer); + } + + private final PersistenceInstanceWriter writer; + private final WorkflowApplication.Builder appBuilder; + + protected PersistenceApplicationBuilder(Builder appBuilder, PersistenceInstanceWriter writer) { + this.appBuilder = appBuilder; + this.writer = writer; + } + + public WorkflowApplication build() { + appBuilder.withListener(new WorkflowPersistenceListener(writer)); + return appBuilder.build(); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java new file mode 100644 index 00000000..4d470af1 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + +public abstract class PersistenceInstanceHandlers implements AutoCloseable { + + protected final PersistenceInstanceWriter writer; + protected final PersistenceInstanceReader reader; + + protected PersistenceInstanceHandlers( + PersistenceInstanceWriter writer, PersistenceInstanceReader reader) { + this.writer = writer; + this.reader = reader; + } + + public PersistenceInstanceWriter writer() { + return writer; + } + + public PersistenceInstanceReader reader() { + return reader; + } + + @Override + public void close() { + safeClose(writer); + safeClose(reader); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java new file mode 100644 index 00000000..5678e894 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; + +public interface PersistenceInstanceReader extends AutoCloseable { + Map readAll(WorkflowDefinition definition); + + Map read(WorkflowDefinition definition, Collection instanceIds); + + Optional read(WorkflowDefinition definition, String instanceId); + + @Override + default void close() {} +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java new file mode 100644 index 00000000..ae75ef38 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; + +public interface PersistenceInstanceWriter extends AutoCloseable { + + void started(WorkflowContextData workflowContext); + + void completed(WorkflowContextData workflowContext); + + void failed(WorkflowContextData workflowContext, Throwable ex); + + void aborted(WorkflowContextData workflowContext); + + void suspended(WorkflowContextData workflowContext); + + void resumed(WorkflowContextData workflowContext); + + void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext); + + void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext); + + @Override + default void close() {} +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java new file mode 100644 index 00000000..2e6c688b --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceTaskInfo.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowModel; +import java.time.Instant; + +public record PersistenceTaskInfo( + Instant instant, + WorkflowModel model, + WorkflowModel context, + Boolean isEndNode, + String nextPosition) {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceWorkflowInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceWorkflowInfo.java new file mode 100644 index 00000000..0c5d27bf --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceWorkflowInfo.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.time.Instant; +import java.util.Map; + +public record PersistenceWorkflowInfo( + String id, + Instant startedAt, + WorkflowModel input, + WorkflowStatus status, + Map tasks) {} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java new file mode 100644 index 00000000..335b87d0 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java @@ -0,0 +1,62 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowMutableInstance; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.executors.TransitionInfo; +import java.util.concurrent.CompletableFuture; + +public class WorkflowPersistenceInstance extends WorkflowMutableInstance { + + private final PersistenceWorkflowInfo info; + + public WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) { + super(definition, info.id(), info.input()); + this.info = info; + } + + @Override + public CompletableFuture start() { + return startExecution( + () -> { + startedAt = info.startedAt(); + if (info.status() == WorkflowStatus.SUSPENDED) { + internalSuspend(); + } + }); + } + + @Override + public void restoreContext(WorkflowContext workflow, TaskContext context) { + PersistenceTaskInfo taskInfo = info.tasks().remove(context.position().jsonPointer()); + if (taskInfo != null) { + context.output(taskInfo.model()); + context.completedAt(taskInfo.instant()); + context.transition( + new TransitionInfo( + taskInfo.nextPosition() == null + ? null + : workflow.definition().taskExecutor(taskInfo.nextPosition()), + taskInfo.isEndNode())); + workflow.context(taskInfo.context()); + } + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java new file mode 100644 index 00000000..d03da00d --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceListener.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; + +public class WorkflowPersistenceListener implements WorkflowExecutionListener { + + private final PersistenceInstanceWriter persistenceWriter; + + public WorkflowPersistenceListener(PersistenceInstanceWriter persistenceWriter) { + this.persistenceWriter = persistenceWriter; + } + + @Override + public void onWorkflowStarted(WorkflowStartedEvent ev) { + persistenceWriter.started(ev.workflowContext()); + } + + @Override + public void onWorkflowFailed(WorkflowFailedEvent ev) { + persistenceWriter.failed(ev.workflowContext(), ev.cause()); + } + + @Override + public void onWorkflowCancelled(WorkflowCancelledEvent ev) { + persistenceWriter.aborted(ev.workflowContext()); + } + + @Override + public void onWorkflowSuspended(WorkflowSuspendedEvent ev) { + persistenceWriter.suspended(ev.workflowContext()); + } + + @Override + public void onWorkflowResumed(WorkflowResumedEvent ev) { + persistenceWriter.resumed(ev.workflowContext()); + } + + @Override + public void onWorkflowCompleted(WorkflowCompletedEvent ev) { + persistenceWriter.completed(ev.workflowContext()); + } + + @Override + public void onTaskStarted(TaskStartedEvent ev) { + persistenceWriter.taskStarted(ev.workflowContext(), ev.taskContext()); + } + + @Override + public void onTaskCompleted(TaskCompletedEvent ev) { + persistenceWriter.taskCompleted(ev.workflowContext(), ev.taskContext()); + } + + public void close() { + safeClose(persistenceWriter); + } +} diff --git a/impl/persistence/bigmap/pom.xml b/impl/persistence/bigmap/pom.xml new file mode 100644 index 00000000..9ad508a1 --- /dev/null +++ b/impl/persistence/bigmap/pom.xml @@ -0,0 +1,16 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-persistence + 8.0.0-SNAPSHOT + + serverlessworkflow-persistence-big-map + Serverless Workflow :: Impl :: Persistence:: BigMap + + + io.serverlessworkflow + serverlessworkflow-persistence-api + + + \ No newline at end of file diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java new file mode 100644 index 00000000..25ca9e4f --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapIdInstanceWriter.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import io.serverlessworkflow.impl.WorkflowContextData; + +public abstract class BigMapIdInstanceWriter + extends BigMapInstanceWriter { + + protected BigMapIdInstanceWriter(BigMapInstanceStore store) { + super(store); + } + + @Override + protected String key(WorkflowContextData workflowContext) { + return workflowContext.instanceData().id(); + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java new file mode 100644 index 00000000..e12de9b8 --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceReader.java @@ -0,0 +1,141 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceReader; +import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; +import io.serverlessworkflow.impl.persistence.WorkflowPersistenceInstance; +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +public abstract class BigMapInstanceReader implements PersistenceInstanceReader { + + private final BigMapInstanceStore store; + + protected BigMapInstanceReader(BigMapInstanceStore store) { + this.store = store; + } + + private Result doTransaction( + Function, Result> operations) { + BigMapInstanceTransaction transaction = store.begin(); + try { + Result result = operations.apply(transaction); + transaction.commit(); + return result; + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } + } + + @Override + public Map readAll(WorkflowDefinition definition) { + return doTransaction( + t -> { + Map instances = t.instanceData(definition); + Map status = t.status(definition); + return instances.entrySet().stream() + .map( + e -> + restore( + definition, + e.getKey(), + e.getValue(), + t.tasks(e.getKey()), + status.get(e.getKey()))) + .collect(Collectors.toMap(WorkflowInstance::id, i -> i)); + }); + } + + @Override + public Map read( + WorkflowDefinition definition, Collection instanceIds) { + return doTransaction( + t -> { + Map instances = t.instanceData(definition); + Map status = t.status(definition); + return instanceIds.stream() + .map(id -> read(instances, status, t.tasks(id), definition, id)) + .flatMap(Optional::stream) + .collect(Collectors.toMap(WorkflowInstance::id, id -> id)); + }); + } + + @Override + public Optional read(WorkflowDefinition definition, String instanceId) { + return doTransaction( + t -> + read( + t.instanceData(definition), + t.status(definition), + t.tasks(instanceId), + definition, + instanceId)); + } + + private Optional read( + Map instances, + Map status, + Map tasks, + WorkflowDefinition definition, + String instanceId) { + return instances.containsKey(instanceId) + ? Optional.empty() + : Optional.of( + restore( + definition, instanceId, instances.get(instanceId), tasks, status.get(instanceId))); + } + + public void close() {} + + protected WorkflowInstance restore( + WorkflowDefinition definition, + String instanceId, + V instanceData, + Map tasksData, + S status) { + return new WorkflowPersistenceInstance( + definition, readPersistenceInfo(instanceId, instanceData, tasksData, status)); + } + + protected abstract PersistenceTaskInfo unmarshallTaskInfo(T taskData); + + protected abstract PersistenceInstanceInfo unmarshallInstanceInfo(V instanceData); + + protected abstract WorkflowStatus unmarshallStatus(S statusData); + + protected PersistenceWorkflowInfo readPersistenceInfo( + String instanceId, V instanceData, Map tasksData, S status) { + PersistenceInstanceInfo instanceInfo = unmarshallInstanceInfo(instanceData); + return new PersistenceWorkflowInfo( + instanceId, + instanceInfo.startedAt(), + instanceInfo.input(), + status == null ? null : unmarshallStatus(status), + tasksData.entrySet().stream() + .collect( + Collectors.toMap(Entry::getKey, entry -> unmarshallTaskInfo(entry.getValue())))); + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java new file mode 100644 index 00000000..aa1d998e --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceStore.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +public interface BigMapInstanceStore extends AutoCloseable { + BigMapInstanceTransaction begin(); +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java new file mode 100644 index 00000000..72b89ed1 --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import io.serverlessworkflow.impl.WorkflowDefinitionData; +import java.util.Map; + +public interface BigMapInstanceTransaction { + + Map instanceData(WorkflowDefinitionData definition); + + Map status(WorkflowDefinitionData workflowContext); + + Map tasks(K instanceId); + + void cleanupTasks(K instanceId); + + void commit(); + + void rollback(); +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java new file mode 100644 index 00000000..77f96e24 --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceWriter.java @@ -0,0 +1,112 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter; +import java.util.function.Consumer; + +public abstract class BigMapInstanceWriter implements PersistenceInstanceWriter { + + private BigMapInstanceStore store; + + protected BigMapInstanceWriter(BigMapInstanceStore store) { + this.store = store; + } + + private void doTransaction(Consumer> operations) { + BigMapInstanceTransaction transaction = store.begin(); + try { + operations.accept(transaction); + transaction.commit(); + } catch (Exception ex) { + transaction.rollback(); + throw ex; + } + } + + @Override + public void started(WorkflowContextData workflowContext) { + doTransaction( + t -> + t.instanceData(workflowContext.definition()) + .put(key(workflowContext), marshallInstance(workflowContext.instanceData()))); + } + + @Override + public void completed(WorkflowContextData workflowContext) { + removeProcessInstance(workflowContext); + } + + @Override + public void failed(WorkflowContextData workflowContext, Throwable ex) { + removeProcessInstance(workflowContext); + } + + @Override + public void aborted(WorkflowContextData workflowContext) { + removeProcessInstance(workflowContext); + } + + @Override + public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {} + + @Override + public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { + doTransaction( + t -> + t.tasks(key(workflowContext)) + .put( + taskContext.position().jsonPointer(), + marshallTaskCompleted(workflowContext, (TaskContext) taskContext))); + } + + @Override + public void suspended(WorkflowContextData workflowContext) { + doTransaction( + t -> + t.status(workflowContext.definition()) + .put(key(workflowContext), marshallStatus(WorkflowStatus.SUSPENDED))); + } + + @Override + public void resumed(WorkflowContextData workflowContext) { + doTransaction(t -> t.status(workflowContext.definition()).remove(key(workflowContext))); + } + + protected void removeProcessInstance(WorkflowContextData workflowContext) { + doTransaction( + t -> { + K key = key(workflowContext); + t.instanceData(workflowContext.definition()).remove(key); + t.status(workflowContext.definition()).remove(key); + t.cleanupTasks(key); + }); + } + + protected abstract K key(WorkflowContextData workflowContext); + + protected abstract V marshallInstance(WorkflowInstanceData instance); + + protected abstract T marshallTaskCompleted( + WorkflowContextData workflowContext, TaskContext taskContext); + + protected abstract S marshallStatus(WorkflowStatus status); +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java new file mode 100644 index 00000000..8b90ae18 --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceReader.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; +import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; +import java.io.ByteArrayInputStream; +import java.time.Instant; + +public class BytesMapInstanceReader extends BigMapInstanceReader { + + private final WorkflowBufferFactory factory; + + public BytesMapInstanceReader( + BigMapInstanceStore store, WorkflowBufferFactory factory) { + super(store); + this.factory = factory; + } + + @Override + protected PersistenceTaskInfo unmarshallTaskInfo(byte[] taskData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(taskData))) { + buffer.readByte(); // version byte not used at the moment + Instant date = buffer.readInstant(); + WorkflowModel model = (WorkflowModel) buffer.readObject(); + WorkflowModel context = (WorkflowModel) buffer.readObject(); + Boolean isEndNode = null; + String nextPosition = null; + isEndNode = buffer.readBoolean(); + boolean hasNext = buffer.readBoolean(); + if (hasNext) { + nextPosition = buffer.readString(); + } + return new PersistenceTaskInfo(date, model, context, isEndNode, nextPosition); + } + } + + @Override + protected PersistenceInstanceInfo unmarshallInstanceInfo(byte[] instanceData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(instanceData))) { + buffer.readByte(); // version byte not used at the moment + return new PersistenceInstanceInfo(buffer.readInstant(), (WorkflowModel) buffer.readObject()); + } + } + + @Override + protected WorkflowStatus unmarshallStatus(byte[] statusData) { + try (WorkflowInputBuffer buffer = factory.input(new ByteArrayInputStream(statusData))) { + buffer.readByte(); // version byte not used at the moment + return buffer.readEnum(WorkflowStatus.class); + } + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java new file mode 100644 index 00000000..320d7eff --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceWriter.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.executors.AbstractTaskExecutor; +import io.serverlessworkflow.impl.executors.TaskExecutor; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowOutputBuffer; +import java.io.ByteArrayOutputStream; + +public class BytesMapInstanceWriter extends BigMapIdInstanceWriter { + + private final WorkflowBufferFactory factory; + + public BytesMapInstanceWriter( + BigMapInstanceStore store, WorkflowBufferFactory factory) { + super(store); + this.factory = factory; + } + + @Override + protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeInstant(taskContext.completedAt()); + writeModel(writer, taskContext.output()); + writeModel(writer, contextData.context()); + boolean isEndNode = taskContext.transition().isEndNode(); + writer.writeBoolean(isEndNode); + TaskExecutor next = taskContext.transition().next(); + if (next == null) { + writer.writeBoolean(false); + } else { + writer.writeBoolean(true); + writer.writeString(((AbstractTaskExecutor) next).position().jsonPointer()); + } + } + + return bytes.toByteArray(); + } + + @Override + protected byte[] marshallStatus(WorkflowStatus status) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeEnum(status); + } + return bytes.toByteArray(); + } + + @Override + protected byte[] marshallInstance(WorkflowInstanceData instance) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeByte(MarshallingUtils.VERSION_0); + writer.writeInstant(instance.startedAt()); + writeModel(writer, instance.input()); + } + return bytes.toByteArray(); + } + + protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) { + writer.writeObject(model); + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java new file mode 100644 index 00000000..e7767888 --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapPersistenceInstanceHandlers.java @@ -0,0 +1,72 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + +import io.serverlessworkflow.impl.marshaller.DefaultBufferFactory; +import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceReader; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceWriter; + +public class BytesMapPersistenceInstanceHandlers extends PersistenceInstanceHandlers + implements AutoCloseable { + + private final BigMapInstanceStore store; + + protected BytesMapPersistenceInstanceHandlers( + PersistenceInstanceWriter writer, + PersistenceInstanceReader reader, + BigMapInstanceStore store) { + super(writer, reader); + this.store = store; + } + + public static class Builder { + private final BigMapInstanceStore store; + private WorkflowBufferFactory factory; + + private Builder(BigMapInstanceStore store) { + this.store = store; + } + + public Builder withFactory(WorkflowBufferFactory factory) { + this.factory = factory; + return this; + } + + public PersistenceInstanceHandlers build() { + if (factory == null) { + factory = DefaultBufferFactory.factory(); + } + return new BytesMapPersistenceInstanceHandlers( + new BytesMapInstanceWriter(store, factory), + new BytesMapInstanceReader(store, factory), + store); + } + } + + public static Builder builder(BigMapInstanceStore store) { + return new Builder(store); + } + + @Override + public void close() { + super.close(); + safeClose(store); + } +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java new file mode 100644 index 00000000..2fea0224 --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/MarshallingUtils.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +class MarshallingUtils { + + private MarshallingUtils() {} + + public static final byte VERSION_0 = 0; +} diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java new file mode 100644 index 00000000..b4582986 --- /dev/null +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/PersistenceInstanceInfo.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.bigmap; + +import io.serverlessworkflow.impl.WorkflowModel; +import java.time.Instant; + +public record PersistenceInstanceInfo(Instant startedAt, WorkflowModel input) {} diff --git a/impl/persistence/jackson-marshaller/pom.xml b/impl/persistence/jackson-marshaller/pom.xml new file mode 100644 index 00000000..322a557c --- /dev/null +++ b/impl/persistence/jackson-marshaller/pom.xml @@ -0,0 +1,20 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-persistence + 8.0.0-SNAPSHOT + + serverlessworkflow-persistence-jackson-marshaller + Serverless Workflow :: Impl :: Persistence:: Marshaller:: Jackson + + + io.serverlessworkflow + serverlessworkflow-persistence-api + + + io.serverlessworkflow + serverlessworkflow-impl-jackson + + + \ No newline at end of file diff --git a/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java b/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java new file mode 100644 index 00000000..ee0933ff --- /dev/null +++ b/impl/persistence/jackson-marshaller/src/main/java/io/serverlessworkflow/impl/marshaller/jackson/JacksonModelMarshaller.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.marshaller.jackson; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.serverlessworkflow.impl.expressions.jq.JacksonModel; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import io.serverlessworkflow.impl.marshaller.CustomObjectMarshaller; +import io.serverlessworkflow.impl.marshaller.WorkflowInputBuffer; +import io.serverlessworkflow.impl.marshaller.WorkflowOutputBuffer; +import java.io.IOException; +import java.io.UncheckedIOException; + +public class JacksonModelMarshaller implements CustomObjectMarshaller { + + @Override + public void write(WorkflowOutputBuffer buffer, JacksonModel object) { + try { + buffer.writeBytes(JsonUtils.mapper().writeValueAsBytes(object)); + } catch (JsonProcessingException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public JacksonModel read(WorkflowInputBuffer buffer) { + try { + return JsonUtils.mapper().readValue(buffer.readBytes(), JacksonModel.class); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public Class getObjectClass() { + return JacksonModel.class; + } +} diff --git a/impl/persistence/jackson-marshaller/src/main/resources/META-INF/services/io.serverlessworkflow.impl.marshaller.CustomObjectMarshaller b/impl/persistence/jackson-marshaller/src/main/resources/META-INF/services/io.serverlessworkflow.impl.marshaller.CustomObjectMarshaller new file mode 100644 index 00000000..81b32636 --- /dev/null +++ b/impl/persistence/jackson-marshaller/src/main/resources/META-INF/services/io.serverlessworkflow.impl.marshaller.CustomObjectMarshaller @@ -0,0 +1 @@ +io.serverlessworkflow.impl.marshaller.jackson.JacksonModelMarshaller \ No newline at end of file diff --git a/impl/persistence/mvstore/README.md b/impl/persistence/mvstore/README.md new file mode 100644 index 00000000..ff67da68 --- /dev/null +++ b/impl/persistence/mvstore/README.md @@ -0,0 +1,41 @@ +[![Gitpod ready-to-code](https://img.shields.io/badge/Gitpod-ready--to--code-blue?logo=gitpod)](https://gitpod.io/#https://github.com/serverlessworkflow/sdk-java) + +# Serverless Workflow Specification — Java SDK (Reference Implementation)- Persistence - MVStore + + +This document explains how to enable persistence using MVStore as underlying persistent mechanism. It is assumed that the reader is familiar with [standard workflow execution mechanism](../../README.md). + +To enable MVStore persistence, users should at least do the following things: + +- Initialize a MVStorePersistenceStore instance, passing the path of the file containing the persisted information +- Pass this MVStorePersitenceStore as argument of BytesMapPersistenceInstanceHandlers.builder. This will create PersistenceInstanceWriter and PersistenceInstanceReader. +- Use the PersistenceInstanceWriter created in the previous step to decorate the existing WorkflowApplication builder. + +The code will look like this + +---- + try (PersistenceInstanceHandlers handlers = + BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db")) + .build(); + WorkflowApplication application = + PersistenceApplicationBuilder.builder( + WorkflowApplication.builder(), + handlers.writer()) + .build(); ) + { + // run workflow normally, the progress will be persisted + } +---- + + +If user wants to resume execution of all previously existing instances (typically after a server crash), he can use the reader created in the previous block to retrieve all stored instances. + +Once retrieved, calling `start` method will resume the execution after the latest completed task before the running JVM was stopped. + +---- + handlers.reader().readAll(definition).values().forEach(WorkflowInstance::start); +---- + +--- + +*Questions or ideas? PRs and issues welcome!* diff --git a/impl/persistence/mvstore/pom.xml b/impl/persistence/mvstore/pom.xml new file mode 100644 index 00000000..28963d62 --- /dev/null +++ b/impl/persistence/mvstore/pom.xml @@ -0,0 +1,23 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-persistence + 8.0.0-SNAPSHOT + + + 1.4.199 + + serverlessworkflow-persistence-mvstore + Serverless Workflow :: Impl :: Persistence:: MVStore + + + com.h2database + h2-mvstore + + + io.serverlessworkflow + serverlessworkflow-persistence-big-map + + + \ No newline at end of file diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java new file mode 100644 index 00000000..0f206f9b --- /dev/null +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.mvstore; + +import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceStore; +import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction; +import org.h2.mvstore.MVStore; +import org.h2.mvstore.tx.TransactionStore; + +public class MVStorePersistenceStore + implements BigMapInstanceStore { + private final TransactionStore mvStore; + + public MVStorePersistenceStore(String dbName) { + this.mvStore = new TransactionStore(MVStore.open(dbName)); + } + + @Override + public void close() { + mvStore.close(); + } + + @Override + public BigMapInstanceTransaction begin() { + return new MVStoreTransaction(mvStore.begin()); + } +} diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java new file mode 100644 index 00000000..66b09499 --- /dev/null +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java @@ -0,0 +1,84 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.mvstore; + +import io.serverlessworkflow.api.types.Document; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowDefinitionData; +import io.serverlessworkflow.impl.persistence.bigmap.BigMapInstanceTransaction; +import java.util.Map; +import org.h2.mvstore.tx.Transaction; +import org.h2.mvstore.tx.TransactionMap; + +public class MVStoreTransaction + implements BigMapInstanceTransaction { + + protected static final String ID_SEPARATOR = "-"; + + private final Transaction transaction; + + public MVStoreTransaction(Transaction transaction) { + this.transaction = transaction; + } + + protected static String identifier(Workflow workflow, String sep) { + Document document = workflow.getDocument(); + return document.getNamespace() + sep + document.getName() + sep + document.getVersion(); + } + + @Override + public Map instanceData(WorkflowDefinitionData workflowContext) { + return openMap(workflowContext, "instances"); + } + + @Override + public Map tasks(String instanceId) { + return taskMap(instanceId); + } + + @Override + public Map status(WorkflowDefinitionData workflowContext) { + return openMap(workflowContext, "status"); + } + + @Override + public void cleanupTasks(String instanceId) { + transaction.removeMap(taskMap(instanceId)); + } + + private TransactionMap taskMap(String instanceId) { + return transaction.openMap(mapTaskName(instanceId)); + } + + private Map openMap(WorkflowDefinitionData workflowDefinition, String suffix) { + return transaction.openMap( + identifier(workflowDefinition.workflow(), ID_SEPARATOR) + ID_SEPARATOR + suffix); + } + + private String mapTaskName(String instanceId) { + return instanceId + ID_SEPARATOR + "tasks"; + } + + @Override + public void commit() { + transaction.commit(); + } + + @Override + public void rollback() { + transaction.rollback(); + } +} diff --git a/impl/persistence/pom.xml b/impl/persistence/pom.xml new file mode 100644 index 00000000..8e26ebb3 --- /dev/null +++ b/impl/persistence/pom.xml @@ -0,0 +1,17 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-persistence + Serverless Workflow :: Implementation:: Persistence + pom + + jackson-marshaller + mvstore + bigmap + api + + diff --git a/impl/pom.xml b/impl/pom.xml index 41c4cb73..f252b49b 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -9,6 +9,7 @@ Serverless Workflow :: Impl pom + 1.4.199 8.3.0 4.0.0 1.6.0 @@ -26,6 +27,26 @@ serverlessworkflow-impl-http ${project.version} + + io.serverlessworkflow + serverlessworkflow-persistence-api + ${project.version} + + + io.serverlessworkflow + serverlessworkflow-persistence-big-map + ${project.version} + + + io.serverlessworkflow + serverlessworkflow-persistence-jackson-marshaller + ${project.version} + + + io.serverlessworkflow + serverlessworkflow-persistence-mvstore + ${project.version} + io.serverlessworkflow serverlessworkflow-impl-jackson-jwt @@ -63,6 +84,11 @@ ${version.org.glassfish.jersey} test + + com.h2database + h2-mvstore + ${version.com.h2database} + @@ -70,6 +96,7 @@ core jackson jwt-impl + persistence test \ No newline at end of file diff --git a/impl/test/db-samples/running.db b/impl/test/db-samples/running.db new file mode 100644 index 00000000..29adc703 Binary files /dev/null and b/impl/test/db-samples/running.db differ diff --git a/impl/test/db-samples/suspended.db b/impl/test/db-samples/suspended.db new file mode 100644 index 00000000..a174df9e Binary files /dev/null and b/impl/test/db-samples/suspended.db differ diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 1009da33..f861f9c5 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -16,6 +16,15 @@ io.serverlessworkflow serverlessworkflow-api + + io.serverlessworkflow + serverlessworkflow-persistence-mvstore + + + io.serverlessworkflow + serverlessworkflow-persistence-jackson-marshaller + test + io.serverlessworkflow serverlessworkflow-impl-http diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java new file mode 100644 index 00000000..1d4f41cf --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +public class DBGenerator { + + public static void main(String[] args) throws IOException { + runInstance("db-samples/running.db", false); + runInstance("db-samples/suspended.db", true); + } + + private static void runInstance(String dbName, boolean suspend) throws IOException { + Files.deleteIfExists(Path.of(dbName)); + try (PersistenceInstanceHandlers factories = + BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) + .build(); + WorkflowApplication application = + PersistenceApplicationBuilder.builder( + WorkflowApplication.builder().withListener(new TraceExecutionListener()), + factories.writer()) + .build()) { + WorkflowDefinition definition = + application.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml")); + WorkflowInstance instance = definition.instance(Map.of()); + instance.start(); + if (suspend) { + instance.suspend(); + } + } + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java new file mode 100644 index 00000000..bd591e3f --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -0,0 +1,110 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.bigmap.BytesMapPersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.mvstore.MVStorePersistenceStore; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Map; +import org.junit.jupiter.api.Test; + +public class MvStorePersistenceTest { + + @Test + void testSimpleRun() throws IOException { + final String dbName = "db-samples/simple.db"; + try (PersistenceInstanceHandlers handlers = + BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) + .build(); + WorkflowApplication application = + PersistenceApplicationBuilder.builder(WorkflowApplication.builder(), handlers.writer()) + .build(); ) { + WorkflowDefinition definition = + application.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/simple-expression.yaml")); + assertThat(handlers.reader().readAll(definition).values()).isEmpty(); + definition.instance(Map.of()).start().join(); + assertThat(handlers.reader().readAll(definition).values()).isEmpty(); + } finally { + Files.delete(Path.of(dbName)); + } + } + + @Test + void testWaitingInstance() throws IOException { + TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); + try (WorkflowApplication application = + WorkflowApplication.builder().withListener(taskCounter).build()) { + WorkflowDefinition definition = + application.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml")); + + WorkflowInstance instance = definition.instance(Map.of()); + instance.start(); + assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(1); + } + } + + @Test + void testRestoreWaitingInstance() throws IOException { + runIt("db-samples/running.db", WorkflowStatus.WAITING); + } + + @Test + void testRestoreSuspendedInstance() throws IOException { + runIt("db-samples/suspended.db", WorkflowStatus.SUSPENDED); + } + + private void runIt(String dbName, WorkflowStatus expectedStatus) throws IOException { + TaskCounterPerInstanceListener taskCounter = new TaskCounterPerInstanceListener(); + try (PersistenceInstanceHandlers handlers = + BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) + .build(); + WorkflowApplication application = + PersistenceApplicationBuilder.builder( + WorkflowApplication.builder() + .withListener(taskCounter) + .withListener(new TraceExecutionListener()), + handlers.writer()) + .build(); ) { + WorkflowDefinition definition = + application.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/set-listen-to-any.yaml")); + Collection instances = handlers.reader().readAll(definition).values(); + assertThat(instances).hasSize(1); + instances.forEach(WorkflowInstance::start); + assertThat(instances) + .singleElement() + .satisfies( + instance -> { + assertThat(instance.status()).isEqualTo(expectedStatus); + assertThat(taskCounter.taskCounter(instance.id()).completed()).isEqualTo(0); + }); + } + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java new file mode 100644 index 00000000..19fe6217 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/TaskCounterPerInstanceListener.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TaskCounterPerInstanceListener implements WorkflowExecutionListener { + + public static class TaskCounter { + private int started; + private int completed; + + public void incStarted() { + started++; + } + + public void incCompleted() { + completed++; + } + + public int started() { + return started; + } + + public int completed() { + return completed; + } + } + + private Map taskCounter = new ConcurrentHashMap<>(); + + public void onTaskStarted(TaskStartedEvent ev) { + taskCounter(ev).incStarted(); + } + + private TaskCounter taskCounter(WorkflowEvent ev) { + return taskCounter(ev.workflowContext().instanceData().id()); + } + + public TaskCounter taskCounter(String instanceId) { + return taskCounter.computeIfAbsent(instanceId, k -> new TaskCounter()); + } + + public void onTaskCompleted(TaskCompletedEvent ev) { + taskCounter(ev).incCompleted(); + } +} diff --git a/impl/test/src/test/resources/workflows-samples/set-listen-to-any.yaml b/impl/test/src/test/resources/workflows-samples/set-listen-to-any.yaml new file mode 100644 index 00000000..d82936e9 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/set-listen-to-any.yaml @@ -0,0 +1,13 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: set-listen-to-any + version: '0.1.0' +do: + - doSomethingBeforeEvent: + set: + name: javierito + - callDoctor: + listen: + to: + any: [] \ No newline at end of file