Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;

import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -190,6 +194,11 @@ public boolean resume() {
CompletableFuture<TaskContext> toBeCompleted = suspended;
suspended = null;
toBeCompleted.complete(suspendedTask);
publishEvent(
workflowContext,
l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask)));
publishEvent(
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
} else {
suspended = null;
}
Expand All @@ -208,6 +217,11 @@ public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
if (suspended != null) {
suspendedTask = t;
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
publishEvent(
workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t)));
publishEvent(
workflowContext,
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
return suspended;
}
if (cancelled != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,21 @@
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.events.CloudEventUtils;
import io.serverlessworkflow.impl.events.EventPublisher;
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskEvent;
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
import java.time.OffsetDateTime;

public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionListener {
Expand Down Expand Up @@ -65,6 +71,45 @@ public void onTaskCompleted(TaskCompletedEvent ev) {
.build());
}

@Override
public void onTaskSuspended(TaskSuspendedEvent ev) {
eventPublisher(ev)
.publish(
builder()
.withData(
cloudEventData(
new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
this::convert))
.withType("io.serverlessworkflow.task.suspended.v1")
.build());
}

@Override
public void onTaskResumed(TaskResumedEvent ev) {
eventPublisher(ev)
.publish(
builder()
.withData(
cloudEventData(
new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
this::convert))
.withType("io.serverlessworkflow.task.resumed.v1")
.build());
}

@Override
public void onTaskCancelled(TaskCancelledEvent ev) {
eventPublisher(ev)
.publish(
builder()
.withData(
cloudEventData(
new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
this::convert))
.withType("io.serverlessworkflow.task.cancelled.v1")
.build());
}

@Override
public void onTaskFailed(TaskFailedEvent ev) {
eventPublisher(ev)
Expand All @@ -90,6 +135,44 @@ public void onWorkflowStarted(WorkflowStartedEvent ev) {
.build());
}

@Override
public void onWorkflowSuspended(WorkflowSuspendedEvent ev) {
eventPublisher(ev)
.publish(
builder()
.withData(
cloudEventData(
new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()),
this::convert))
.withType("io.serverlessworkflow.workflow.suspended.v1")
.build());
}

@Override
public void onWorkflowCancelled(WorkflowCancelledEvent ev) {
eventPublisher(ev)
.publish(
builder()
.withData(
cloudEventData(
new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()),
this::convert))
.withType("io.serverlessworkflow.workflow.cancelled.v1")
.build());
}

@Override
public void onWorkflowResumed(WorkflowResumedEvent ev) {
eventPublisher(ev)
.publish(
builder()
.withData(
cloudEventData(
new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert))
.withType("io.serverlessworkflow.workflow.resumed.v1")
.build());
}

@Override
public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
eventPublisher(ev)
Expand Down Expand Up @@ -118,6 +201,12 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) {

protected abstract byte[] convert(WorkflowStartedCEData data);

protected abstract byte[] convert(WorkflowSuspendedCEData data);

protected abstract byte[] convert(WorkflowResumedCEData data);

protected abstract byte[] convert(WorkflowCancelledCEData data);

protected abstract byte[] convert(WorkflowCompletedCEData data);

protected abstract byte[] convert(TaskStartedCEData data);
Expand All @@ -126,6 +215,12 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) {

protected abstract byte[] convert(TaskFailedCEData data);

protected abstract byte[] convert(TaskSuspendedCEData data);

protected abstract byte[] convert(TaskCancelledCEData data);

protected abstract byte[] convert(TaskResumedCEData data);

protected abstract byte[] convert(WorkflowFailedCEData data);

private static <T> CloudEventData cloudEventData(T data, ToBytes<T> toBytes) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle.ce;

import java.time.OffsetDateTime;

public record TaskCancelledCEData(
String workflow,
String task,
WorkflowDefinitionCEData definition,
OffsetDateTime cancelledAt) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle.ce;

import java.time.OffsetDateTime;

public record TaskResumedCEData(
String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime resumedAt) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle.ce;

import java.time.OffsetDateTime;

public record TaskSuspendedCEData(
String workflow,
String task,
WorkflowDefinitionCEData definition,
OffsetDateTime suspendedAt) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle.ce;

import java.time.OffsetDateTime;

public record WorkflowCancelledCEData(
String name, WorkflowDefinitionCEData definition, OffsetDateTime cancelledAt) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle.ce;

import java.time.OffsetDateTime;

public record WorkflowResumedCEData(
String name, WorkflowDefinitionCEData definition, OffsetDateTime resumedAt) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.lifecycle.ce;

import java.time.OffsetDateTime;

public record WorkflowSuspendedCEData(
String name, WorkflowDefinitionCEData definition, OffsetDateTime suspendedAt) {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import io.serverlessworkflow.impl.jackson.JsonUtils;
import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher;
import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData;
import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.TaskFailedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.TaskSuspendedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCancelledCEData;
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowResumedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData;
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData;
import java.io.UncheckedIOException;

public class JacksonLifeCyclePublisher extends AbstractLifeCyclePublisher {
Expand Down Expand Up @@ -58,6 +64,36 @@ protected byte[] convert(WorkflowFailedCEData data) {
return genericConvert(data);
}

@Override
protected byte[] convert(WorkflowSuspendedCEData data) {
return genericConvert(data);
}

@Override
protected byte[] convert(WorkflowResumedCEData data) {
return genericConvert(data);
}

@Override
protected byte[] convert(WorkflowCancelledCEData data) {
return genericConvert(data);
}

@Override
protected byte[] convert(TaskSuspendedCEData data) {
return genericConvert(data);
}

@Override
protected byte[] convert(TaskCancelledCEData data) {
return genericConvert(data);
}

@Override
protected byte[] convert(TaskResumedCEData data) {
return genericConvert(data);
}

protected <T> byte[] genericConvert(T data) {
try {
return JsonUtils.mapper().writeValueAsBytes(data);
Expand Down
Loading