Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions impl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ As shown in previous examples, to start a new workflow instance, first a [Workfl

Once started, and before it completes, a workflow instance execution can be suspended or cancelled. Once cancelled, a workflow instance is done, while a suspended one might be resumed.

## Persistence

Workflow progress might be recorded into DB. See [details](persistence/README.md)

## Fluent Java DSL

Prefer building workflows programmatically with type-safe builders and recipes?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class TaskContext implements TaskContextData {
private WorkflowModel rawOutput;
private Instant completedAt;
private TransitionInfo transition;
private boolean completed;

public TaskContext(
WorkflowModel input,
Expand Down Expand Up @@ -109,6 +110,7 @@ public WorkflowModel rawOutput() {

public TaskContext output(WorkflowModel output) {
this.output = output;
this.completed = true;
return this;
}

Expand Down Expand Up @@ -159,6 +161,10 @@ public TaskContext transition(TransitionInfo transition) {
return this;
}

public boolean isCompleted() {
return completed;
}

@Override
public String toString() {
return "TaskContext [position="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.serverlessworkflow.impl;

import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;

import io.serverlessworkflow.api.types.SchemaInline;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.events.EventConsumer;
Expand All @@ -39,13 +41,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowApplication implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(WorkflowApplication.class);

private final TaskExecutorFactory taskFactory;
private final ExpressionFactory exprFactory;
private final ResourceLoaderFactory resourceLoaderFactory;
Expand Down Expand Up @@ -271,14 +269,11 @@ public void close() {
safeClose(definition);
}
definitions.clear();
}

private void safeClose(AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception ex) {
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
for (WorkflowExecutionListener listener : listeners) {
safeClose(listener);
}
listeners.clear();
}

public WorkflowPositionFactory positionFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.serverlessworkflow.impl.resources.ResourceLoader;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {
Expand All @@ -37,6 +39,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
private final WorkflowApplication application;
private final TaskExecutor<?> taskExecutor;
private final ResourceLoader resourceLoader;
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();

private WorkflowDefinition(
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
Expand Down Expand Up @@ -70,7 +73,9 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
}

public WorkflowInstance instance(Object input) {
return new WorkflowMutableInstance(this, application.modelFactory().fromAny(input));
WorkflowModel inputModel = application.modelFactory().fromAny(input);
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
return new WorkflowMutableInstance(this, application().idFactory().get(), inputModel);
}

Optional<SchemaValidator> inputSchemaValidator() {
Expand Down Expand Up @@ -107,8 +112,14 @@ public ResourceLoader resourceLoader() {
return resourceLoader;
}

@Override
public void close() {
// TODO close resourcers hold for uncompleted process instances, if any
public TaskExecutor<?> taskExecutor(String jsonPointer) {
return executors.get(jsonPointer);
}

public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> taskExecutor) {
executors.put(position.jsonPointer(), taskExecutor);
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,43 @@
public class WorkflowMutableInstance implements WorkflowInstance {

protected final AtomicReference<WorkflowStatus> status;
private final String id;
private final WorkflowModel input;
protected final String id;
protected final WorkflowModel input;

protected final WorkflowContext workflowContext;
protected Instant startedAt;

protected AtomicReference<CompletableFuture<WorkflowModel>> futureRef = new AtomicReference<>();
protected Instant completedAt;

private WorkflowContext workflowContext;
private Instant startedAt;
private Instant completedAt;
private volatile WorkflowModel output;
private Lock statusLock = new ReentrantLock();
private CompletableFuture<WorkflowModel> completableFuture;
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;

WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
this.id = definition.application().idFactory().get();
protected WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) {
this.id = id;
this.input = input;
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
this.workflowContext = new WorkflowContext(definition, this);
}

@Override
public CompletableFuture<WorkflowModel> start() {
this.startedAt = Instant.now();
this.status.set(WorkflowStatus.RUNNING);
publishEvent(
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
this.completableFuture =
return startExecution(
() -> {
startedAt = Instant.now();
publishEvent(
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
});
}

protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnable) {
CompletableFuture<WorkflowModel> future = futureRef.get();
if (future != null) {
return future;
}
status.set(WorkflowStatus.RUNNING);
runnable.run();
future =
TaskExecutorHelper.processTaskList(
workflowContext.definition().startTask(),
workflowContext,
Expand All @@ -75,7 +86,8 @@ public CompletableFuture<WorkflowModel> start() {
.orElse(input))
.whenComplete(this::whenFailed)
.thenApply(this::whenSuccess);
return completableFuture;
futureRef.set(future);
return future;
}

private void whenFailed(WorkflowModel result, Throwable ex) {
Expand All @@ -94,7 +106,7 @@ private void handleException(Throwable ex) {
}

private WorkflowModel whenSuccess(WorkflowModel node) {
output =
WorkflowModel output =
workflowContext
.definition()
.outputFilter()
Expand All @@ -103,7 +115,8 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
status.set(WorkflowStatus.COMPLETED);
publishEvent(
workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext)));
workflowContext,
l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output)));
return output;
}

Expand Down Expand Up @@ -134,11 +147,13 @@ public WorkflowStatus status() {

@Override
public WorkflowModel output() {
return output;
CompletableFuture<WorkflowModel> future = futureRef.get();
return future != null ? future.join() : null;
}

@Override
public <T> T outputAs(Class<T> clazz) {
WorkflowModel output = output();
return output != null
? output
.as(clazz)
Expand Down Expand Up @@ -171,8 +186,7 @@ public boolean suspend() {
try {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get()) && suspended == null) {
suspended = new ConcurrentHashMap<>();
status.set(WorkflowStatus.SUSPENDED);
internalSuspend();
publishEvent(
workflowContext,
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
Expand All @@ -185,6 +199,11 @@ public boolean suspend() {
}
}

protected final void internalSuspend() {
suspended = new ConcurrentHashMap<>();
status.set(WorkflowStatus.SUSPENDED);
}

@Override
public boolean resume() {
try {
Expand Down Expand Up @@ -253,4 +272,6 @@ public boolean cancel() {
statusLock.unlock();
}
}

public void restoreContext(WorkflowContext workflow, TaskContext context) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowUtils {

private WorkflowUtils() {}

private static final Logger logger = LoggerFactory.getLogger(WorkflowUtils.class);

public static Optional<SchemaValidator> getSchemaValidator(
SchemaValidatorFactory validatorFactory, ResourceLoader resourceLoader, SchemaUnion schema) {
if (schema != null) {
Expand Down Expand Up @@ -138,4 +142,14 @@ public static String toString(UriTemplate template) {
URI uri = template.getLiteralUri();
return uri != null ? uri.toString() : template.getLiteralUriTemplate();
}

public static void safeClose(AutoCloseable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (Exception ex) {
logger.warn("Error closing resource {}", closeable.getClass().getName(), ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ public abstract static class AbstractTaskExecutorBuilder<
protected final WorkflowApplication application;
protected final Workflow workflow;
protected final ResourceLoader resourceLoader;
private final WorkflowDefinition definition;

private V instance;

protected AbstractTaskExecutorBuilder(
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
this.definition = definition;
this.workflow = definition.workflow();
this.taskName = position.last().toString();
this.position = position;
Expand Down Expand Up @@ -147,6 +149,7 @@ public V build() {
if (instance == null) {
instance = buildInstance();
buildTransition(instance);
definition.addTaskExecutor(position, instance);
}
return instance;
}
Expand Down Expand Up @@ -189,11 +192,13 @@ private CompletableFuture<TaskContext> executeNext(
public CompletableFuture<TaskContext> apply(
WorkflowContext workflowContext, Optional<TaskContext> parentContext, WorkflowModel input) {
TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task);
workflowContext.instance().restoreContext(workflowContext, taskContext);
CompletableFuture<TaskContext> completable = CompletableFuture.completedFuture(taskContext);
if (!TaskExecutorHelper.isActive(workflowContext)) {
return completable;
}
if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
} else if (taskContext.isCompleted()) {
return executeNext(completable, workflowContext);
} else if (ifFilter.map(f -> f.test(workflowContext, taskContext, input)).orElse(true)) {
return executeNext(
completable
.thenCompose(workflowContext.instance()::suspendedCheck)
Expand Down Expand Up @@ -256,6 +261,10 @@ private void handleException(
}
}

public WorkflowPosition position() {
return position;
}

protected abstract TransitionInfo getSkipTransition();

protected abstract CompletableFuture<TaskContext> execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@
package io.serverlessworkflow.impl.lifecycle;

import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowModel;

public class WorkflowCompletedEvent extends WorkflowEvent {

public WorkflowCompletedEvent(WorkflowContextData workflow) {
private WorkflowModel output;

public WorkflowCompletedEvent(WorkflowContextData workflow, WorkflowModel output) {
super(workflow);
this.output = output;
}

public WorkflowModel output() {
return output;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.serverlessworkflow.impl.lifecycle;

public interface WorkflowExecutionListener {
public interface WorkflowExecutionListener extends AutoCloseable {

default void onWorkflowStarted(WorkflowStartedEvent ev) {}

Expand All @@ -42,4 +42,7 @@ default void onTaskSuspended(TaskSuspendedEvent ev) {}
default void onTaskResumed(TaskResumedEvent ev) {}

default void onTaskRetried(TaskRetriedEvent ev) {}

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ public void onWorkflowCompleted(WorkflowCompletedEvent event) {
builder()
.withData(
cloudEventData(
new WorkflowCompletedCEData(id(ev), ref(ev), ev.eventDate(), output(ev)),
new WorkflowCompletedCEData(
id(ev), ref(ev), ev.eventDate(), from(event.output())),
this::convert))
.withType(WORKFLOW_COMPLETED)
.build());
Expand Down Expand Up @@ -328,10 +329,6 @@ private static String pos(TaskEvent ev) {
return ev.taskContext().position().jsonPointer();
}

private static Object output(WorkflowEvent ev) {
return from(ev.workflowContext().instanceData().output());
}

private static Object output(TaskEvent ev) {
return from(ev.taskContext().output());
}
Expand Down
Loading