Skip to content

Commit 6d23850

Browse files
committed
[Fix #744] Disabling event life cycle publishing
In some scenarios, users wont need the cloud events to be published. This allows user to disable publishing at application level. Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent d361b02 commit 6d23850

File tree

4 files changed

+113
-126
lines changed

4 files changed

+113
-126
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class WorkflowApplication implements AutoCloseable {
5959
private final RuntimeDescriptorFactory runtimeDescriptorFactory;
6060
private final EventConsumer<?, ?> eventConsumer;
6161
private final EventPublisher eventPublisher;
62+
private final boolean lifeCycleCEPublishingEnabled;
6263

6364
private WorkflowApplication(Builder builder) {
6465
this.taskFactory = builder.taskFactory;
@@ -73,6 +74,7 @@ private WorkflowApplication(Builder builder) {
7374
this.definitions = new ConcurrentHashMap<>();
7475
this.eventConsumer = builder.eventConsumer;
7576
this.eventPublisher = builder.eventPublisher;
77+
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
7678
}
7779

7880
public TaskExecutorFactory taskFactory() {
@@ -145,6 +147,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
145147
private EventPublisher eventPublisher;
146148
private RuntimeDescriptorFactory descriptorFactory =
147149
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
150+
private boolean lifeCycleCEPublishingEnabled = true;
148151

149152
private Builder() {}
150153

@@ -168,6 +171,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
168171
return this;
169172
}
170173

174+
public Builder disableLifeCycleCEPublishing() {
175+
this.lifeCycleCEPublishingEnabled = false;
176+
return this;
177+
}
178+
171179
public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) {
172180
this.executorFactory = executorFactory;
173181
return this;
@@ -278,4 +286,8 @@ public EventConsumer eventConsumer() {
278286
public ExecutorService executorService() {
279287
return executorFactory.get();
280288
}
289+
290+
public boolean isLifeCycleCEPublishingEnabled() {
291+
return lifeCycleCEPublishingEnabled;
292+
}
281293
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java

Lines changed: 99 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref;
1919
import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowErrorCEData.error;
2020

21+
import io.cloudevents.CloudEvent;
2122
import io.cloudevents.CloudEventData;
2223
import io.cloudevents.core.builder.CloudEventBuilder;
2324
import io.cloudevents.core.data.PojoCloudEventData;
2425
import io.cloudevents.core.data.PojoCloudEventData.ToBytes;
26+
import io.serverlessworkflow.impl.WorkflowApplication;
2527
import io.serverlessworkflow.impl.WorkflowModel;
2628
import io.serverlessworkflow.impl.events.CloudEventUtils;
27-
import io.serverlessworkflow.impl.events.EventPublisher;
2829
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
2930
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
3031
import io.serverlessworkflow.impl.lifecycle.TaskEvent;
@@ -41,13 +42,15 @@
4142
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
4243
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
4344
import java.time.OffsetDateTime;
45+
import java.util.function.Function;
4446

4547
public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionListener {
4648

4749
@Override
48-
public void onTaskStarted(TaskStartedEvent ev) {
49-
eventPublisher(ev)
50-
.publish(
50+
public void onTaskStarted(TaskStartedEvent event) {
51+
publish(
52+
event,
53+
ev ->
5154
builder()
5255
.withData(
5356
cloudEventData(
@@ -58,9 +61,10 @@ public void onTaskStarted(TaskStartedEvent ev) {
5861
}
5962

6063
@Override
61-
public void onTaskCompleted(TaskCompletedEvent ev) {
62-
eventPublisher(ev)
63-
.publish(
64+
public void onTaskCompleted(TaskCompletedEvent event) {
65+
publish(
66+
event,
67+
ev ->
6468
builder()
6569
.withData(
6670
cloudEventData(
@@ -72,9 +76,10 @@ public void onTaskCompleted(TaskCompletedEvent ev) {
7276
}
7377

7478
@Override
75-
public void onTaskSuspended(TaskSuspendedEvent ev) {
76-
eventPublisher(ev)
77-
.publish(
79+
public void onTaskSuspended(TaskSuspendedEvent event) {
80+
publish(
81+
event,
82+
ev ->
7883
builder()
7984
.withData(
8085
cloudEventData(
@@ -85,9 +90,10 @@ public void onTaskSuspended(TaskSuspendedEvent ev) {
8590
}
8691

8792
@Override
88-
public void onTaskResumed(TaskResumedEvent ev) {
89-
eventPublisher(ev)
90-
.publish(
93+
public void onTaskResumed(TaskResumedEvent event) {
94+
publish(
95+
event,
96+
ev ->
9197
builder()
9298
.withData(
9399
cloudEventData(
@@ -98,9 +104,10 @@ public void onTaskResumed(TaskResumedEvent ev) {
98104
}
99105

100106
@Override
101-
public void onTaskCancelled(TaskCancelledEvent ev) {
102-
eventPublisher(ev)
103-
.publish(
107+
public void onTaskCancelled(TaskCancelledEvent event) {
108+
publish(
109+
event,
110+
ev ->
104111
builder()
105112
.withData(
106113
cloudEventData(
@@ -111,9 +118,10 @@ public void onTaskCancelled(TaskCancelledEvent ev) {
111118
}
112119

113120
@Override
114-
public void onTaskFailed(TaskFailedEvent ev) {
115-
eventPublisher(ev)
116-
.publish(
121+
public void onTaskFailed(TaskFailedEvent event) {
122+
publish(
123+
event,
124+
ev ->
117125
builder()
118126
.withData(
119127
cloudEventData(
@@ -124,9 +132,10 @@ public void onTaskFailed(TaskFailedEvent ev) {
124132
}
125133

126134
@Override
127-
public void onWorkflowStarted(WorkflowStartedEvent ev) {
128-
eventPublisher(ev)
129-
.publish(
135+
public void onWorkflowStarted(WorkflowStartedEvent event) {
136+
publish(
137+
event,
138+
ev ->
130139
builder()
131140
.withData(
132141
cloudEventData(
@@ -136,9 +145,10 @@ public void onWorkflowStarted(WorkflowStartedEvent ev) {
136145
}
137146

138147
@Override
139-
public void onWorkflowSuspended(WorkflowSuspendedEvent ev) {
140-
eventPublisher(ev)
141-
.publish(
148+
public void onWorkflowSuspended(WorkflowSuspendedEvent event) {
149+
publish(
150+
event,
151+
ev ->
142152
builder()
143153
.withData(
144154
cloudEventData(
@@ -149,9 +159,10 @@ public void onWorkflowSuspended(WorkflowSuspendedEvent ev) {
149159
}
150160

151161
@Override
152-
public void onWorkflowCancelled(WorkflowCancelledEvent ev) {
153-
eventPublisher(ev)
154-
.publish(
162+
public void onWorkflowCancelled(WorkflowCancelledEvent event) {
163+
publish(
164+
event,
165+
ev ->
155166
builder()
156167
.withData(
157168
cloudEventData(
@@ -162,9 +173,10 @@ public void onWorkflowCancelled(WorkflowCancelledEvent ev) {
162173
}
163174

164175
@Override
165-
public void onWorkflowResumed(WorkflowResumedEvent ev) {
166-
eventPublisher(ev)
167-
.publish(
176+
public void onWorkflowResumed(WorkflowResumedEvent event) {
177+
publish(
178+
event,
179+
ev ->
168180
builder()
169181
.withData(
170182
cloudEventData(
@@ -174,9 +186,10 @@ public void onWorkflowResumed(WorkflowResumedEvent ev) {
174186
}
175187

176188
@Override
177-
public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
178-
eventPublisher(ev)
179-
.publish(
189+
public void onWorkflowCompleted(WorkflowCompletedEvent event) {
190+
publish(
191+
event,
192+
ev ->
180193
builder()
181194
.withData(
182195
cloudEventData(
@@ -187,9 +200,10 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
187200
}
188201

189202
@Override
190-
public void onWorkflowFailed(WorkflowFailedEvent ev) {
191-
eventPublisher(ev)
192-
.publish(
203+
public void onWorkflowFailed(WorkflowFailedEvent event) {
204+
publish(
205+
event,
206+
ev ->
193207
builder()
194208
.withData(
195209
cloudEventData(
@@ -199,29 +213,65 @@ public void onWorkflowFailed(WorkflowFailedEvent ev) {
199213
.build());
200214
}
201215

202-
protected abstract byte[] convert(WorkflowStartedCEData data);
216+
protected byte[] convert(WorkflowStartedCEData data) {
217+
return convertToBytes(data);
218+
}
203219

204-
protected abstract byte[] convert(WorkflowSuspendedCEData data);
220+
protected byte[] convert(WorkflowCompletedCEData data) {
221+
return convertToBytes(data);
222+
}
205223

206-
protected abstract byte[] convert(WorkflowResumedCEData data);
224+
protected byte[] convert(TaskStartedCEData data) {
225+
return convertToBytes(data);
226+
}
207227

208-
protected abstract byte[] convert(WorkflowCancelledCEData data);
228+
protected byte[] convert(TaskCompletedCEData data) {
229+
return convertToBytes(data);
230+
}
209231

210-
protected abstract byte[] convert(WorkflowCompletedCEData data);
232+
protected byte[] convert(TaskFailedCEData data) {
233+
return convertToBytes(data);
234+
}
211235

212-
protected abstract byte[] convert(TaskStartedCEData data);
236+
protected byte[] convert(WorkflowFailedCEData data) {
237+
return convertToBytes(data);
238+
}
213239

214-
protected abstract byte[] convert(TaskCompletedCEData data);
240+
protected byte[] convert(WorkflowSuspendedCEData data) {
241+
return convertToBytes(data);
242+
}
215243

216-
protected abstract byte[] convert(TaskFailedCEData data);
244+
protected byte[] convert(WorkflowResumedCEData data) {
245+
return convertToBytes(data);
246+
}
217247

218-
protected abstract byte[] convert(TaskSuspendedCEData data);
248+
protected byte[] convert(WorkflowCancelledCEData data) {
249+
return convertToBytes(data);
250+
}
219251

220-
protected abstract byte[] convert(TaskCancelledCEData data);
252+
protected byte[] convert(TaskSuspendedCEData data) {
253+
return convertToBytes(data);
254+
}
255+
256+
protected byte[] convert(TaskCancelledCEData data) {
257+
return convertToBytes(data);
258+
}
259+
260+
protected byte[] convert(TaskResumedCEData data) {
261+
return convertToBytes(data);
262+
}
221263

222-
protected abstract byte[] convert(TaskResumedCEData data);
264+
protected abstract <T> byte[] convertToBytes(T data);
223265

224-
protected abstract byte[] convert(WorkflowFailedCEData data);
266+
/* By default, generated cloud events are published, if user has not disabled them at application level,
267+
* using application event publisher. That might be changed if needed by children.
268+
*/
269+
protected <T extends WorkflowEvent> void publish(T ev, Function<T, CloudEvent> ce) {
270+
WorkflowApplication application = ev.workflowContext().definition().application();
271+
if (application.isLifeCycleCEPublishingEnabled()) {
272+
application.eventPublisher().publish(ce.apply(ev));
273+
}
274+
}
225275

226276
private static <T> CloudEventData cloudEventData(T data, ToBytes<T> toBytes) {
227277
return PojoCloudEventData.wrap(data, toBytes);
@@ -246,10 +296,6 @@ private static Object output(WorkflowEvent ev) {
246296
return from(ev.workflowContext().instanceData().output());
247297
}
248298

249-
private static EventPublisher eventPublisher(WorkflowEvent ev) {
250-
return ev.workflowContext().definition().application().eventPublisher();
251-
}
252-
253299
private static Object output(TaskEvent ev) {
254300
return from(ev.taskContext().output());
255301
}

0 commit comments

Comments
 (0)