Skip to content

Commit 498139e

Browse files
committed
[Fix serverlessworkflow#704] Adding CloudEvents for Workflow suspended and resumed (serverlessworkflow#741)
Signed-off-by: fjtirado <[email protected]>
1 parent eaba526 commit 498139e

File tree

10 files changed

+303
-2
lines changed

10 files changed

+303
-2
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent;
1919

2020
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
21+
import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent;
22+
import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent;
2123
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
2224
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
2325
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
26+
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
2427
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
28+
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
2529
import java.time.Instant;
2630
import java.util.Optional;
2731
import java.util.concurrent.CancellationException;
@@ -190,6 +194,11 @@ public boolean resume() {
190194
CompletableFuture<TaskContext> toBeCompleted = suspended;
191195
suspended = null;
192196
toBeCompleted.complete(suspendedTask);
197+
publishEvent(
198+
workflowContext,
199+
l -> l.onTaskResumed(new TaskResumedEvent(workflowContext, suspendedTask)));
200+
publishEvent(
201+
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
193202
} else {
194203
suspended = null;
195204
}
@@ -208,6 +217,11 @@ public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
208217
if (suspended != null) {
209218
suspendedTask = t;
210219
workflowContext.instance().status(WorkflowStatus.SUSPENDED);
220+
publishEvent(
221+
workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t)));
222+
publishEvent(
223+
workflowContext,
224+
l -> l.onWorkflowSuspended(new WorkflowSuspendedEvent(workflowContext)));
211225
return suspended;
212226
}
213227
if (cancelled != null) {

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,21 @@
2525
import io.serverlessworkflow.impl.WorkflowModel;
2626
import io.serverlessworkflow.impl.events.CloudEventUtils;
2727
import io.serverlessworkflow.impl.events.EventPublisher;
28+
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
2829
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
2930
import io.serverlessworkflow.impl.lifecycle.TaskEvent;
3031
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
32+
import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent;
3133
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
34+
import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent;
35+
import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent;
3236
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
3337
import io.serverlessworkflow.impl.lifecycle.WorkflowEvent;
3438
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
3539
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
40+
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
3641
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
42+
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
3743
import java.time.OffsetDateTime;
3844

3945
public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionListener {
@@ -65,6 +71,45 @@ public void onTaskCompleted(TaskCompletedEvent ev) {
6571
.build());
6672
}
6773

74+
@Override
75+
public void onTaskSuspended(TaskSuspendedEvent ev) {
76+
eventPublisher(ev)
77+
.publish(
78+
builder()
79+
.withData(
80+
cloudEventData(
81+
new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
82+
this::convert))
83+
.withType("io.serverlessworkflow.task.suspended.v1")
84+
.build());
85+
}
86+
87+
@Override
88+
public void onTaskResumed(TaskResumedEvent ev) {
89+
eventPublisher(ev)
90+
.publish(
91+
builder()
92+
.withData(
93+
cloudEventData(
94+
new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
95+
this::convert))
96+
.withType("io.serverlessworkflow.task.resumed.v1")
97+
.build());
98+
}
99+
100+
@Override
101+
public void onTaskCancelled(TaskCancelledEvent ev) {
102+
eventPublisher(ev)
103+
.publish(
104+
builder()
105+
.withData(
106+
cloudEventData(
107+
new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
108+
this::convert))
109+
.withType("io.serverlessworkflow.task.cancelled.v1")
110+
.build());
111+
}
112+
68113
@Override
69114
public void onTaskFailed(TaskFailedEvent ev) {
70115
eventPublisher(ev)
@@ -90,6 +135,44 @@ public void onWorkflowStarted(WorkflowStartedEvent ev) {
90135
.build());
91136
}
92137

138+
@Override
139+
public void onWorkflowSuspended(WorkflowSuspendedEvent ev) {
140+
eventPublisher(ev)
141+
.publish(
142+
builder()
143+
.withData(
144+
cloudEventData(
145+
new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()),
146+
this::convert))
147+
.withType("io.serverlessworkflow.workflow.suspended.v1")
148+
.build());
149+
}
150+
151+
@Override
152+
public void onWorkflowCancelled(WorkflowCancelledEvent ev) {
153+
eventPublisher(ev)
154+
.publish(
155+
builder()
156+
.withData(
157+
cloudEventData(
158+
new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()),
159+
this::convert))
160+
.withType("io.serverlessworkflow.workflow.cancelled.v1")
161+
.build());
162+
}
163+
164+
@Override
165+
public void onWorkflowResumed(WorkflowResumedEvent ev) {
166+
eventPublisher(ev)
167+
.publish(
168+
builder()
169+
.withData(
170+
cloudEventData(
171+
new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert))
172+
.withType("io.serverlessworkflow.workflow.resumed.v1")
173+
.build());
174+
}
175+
93176
@Override
94177
public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
95178
eventPublisher(ev)
@@ -118,6 +201,12 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) {
118201

119202
protected abstract byte[] convert(WorkflowStartedCEData data);
120203

204+
protected abstract byte[] convert(WorkflowSuspendedCEData data);
205+
206+
protected abstract byte[] convert(WorkflowResumedCEData data);
207+
208+
protected abstract byte[] convert(WorkflowCancelledCEData data);
209+
121210
protected abstract byte[] convert(WorkflowCompletedCEData data);
122211

123212
protected abstract byte[] convert(TaskStartedCEData data);
@@ -126,6 +215,12 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) {
126215

127216
protected abstract byte[] convert(TaskFailedCEData data);
128217

218+
protected abstract byte[] convert(TaskSuspendedCEData data);
219+
220+
protected abstract byte[] convert(TaskCancelledCEData data);
221+
222+
protected abstract byte[] convert(TaskResumedCEData data);
223+
129224
protected abstract byte[] convert(WorkflowFailedCEData data);
130225

131226
private static <T> CloudEventData cloudEventData(T data, ToBytes<T> toBytes) {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle.ce;
17+
18+
import java.time.OffsetDateTime;
19+
20+
public record TaskCancelledCEData(
21+
String workflow,
22+
String task,
23+
WorkflowDefinitionCEData definition,
24+
OffsetDateTime cancelledAt) {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle.ce;
17+
18+
import java.time.OffsetDateTime;
19+
20+
public record TaskResumedCEData(
21+
String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime resumedAt) {}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle.ce;
17+
18+
import java.time.OffsetDateTime;
19+
20+
public record TaskSuspendedCEData(
21+
String workflow,
22+
String task,
23+
WorkflowDefinitionCEData definition,
24+
OffsetDateTime suspendedAt) {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle.ce;
17+
18+
import java.time.OffsetDateTime;
19+
20+
public record WorkflowCancelledCEData(
21+
String name, WorkflowDefinitionCEData definition, OffsetDateTime cancelledAt) {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle.ce;
17+
18+
import java.time.OffsetDateTime;
19+
20+
public record WorkflowResumedCEData(
21+
String name, WorkflowDefinitionCEData definition, OffsetDateTime resumedAt) {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle.ce;
17+
18+
import java.time.OffsetDateTime;
19+
20+
public record WorkflowSuspendedCEData(
21+
String name, WorkflowDefinitionCEData definition, OffsetDateTime suspendedAt) {}

impl/jackson/src/main/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisher.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,18 @@
1818
import com.fasterxml.jackson.core.JsonProcessingException;
1919
import io.serverlessworkflow.impl.jackson.JsonUtils;
2020
import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher;
21+
import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData;
2122
import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData;
2223
import io.serverlessworkflow.impl.lifecycle.ce.TaskFailedCEData;
24+
import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData;
2325
import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData;
26+
import io.serverlessworkflow.impl.lifecycle.ce.TaskSuspendedCEData;
27+
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCancelledCEData;
2428
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData;
2529
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData;
30+
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowResumedCEData;
2631
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData;
32+
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData;
2733
import java.io.UncheckedIOException;
2834

2935
public class JacksonLifeCyclePublisher extends AbstractLifeCyclePublisher {
@@ -58,6 +64,36 @@ protected byte[] convert(WorkflowFailedCEData data) {
5864
return genericConvert(data);
5965
}
6066

67+
@Override
68+
protected byte[] convert(WorkflowSuspendedCEData data) {
69+
return genericConvert(data);
70+
}
71+
72+
@Override
73+
protected byte[] convert(WorkflowResumedCEData data) {
74+
return genericConvert(data);
75+
}
76+
77+
@Override
78+
protected byte[] convert(WorkflowCancelledCEData data) {
79+
return genericConvert(data);
80+
}
81+
82+
@Override
83+
protected byte[] convert(TaskSuspendedCEData data) {
84+
return genericConvert(data);
85+
}
86+
87+
@Override
88+
protected byte[] convert(TaskCancelledCEData data) {
89+
return genericConvert(data);
90+
}
91+
92+
@Override
93+
protected byte[] convert(TaskResumedCEData data) {
94+
return genericConvert(data);
95+
}
96+
6197
protected <T> byte[] genericConvert(T data) {
6298
try {
6399
return JsonUtils.mapper().writeValueAsBytes(data);

0 commit comments

Comments
 (0)