Skip to content

Commit 8b4ee36

Browse files
committed
[Fix #847] Alternative implementation
Signed-off-by: fjtirado <[email protected]>
1 parent 0d40db7 commit 8b4ee36

File tree

7 files changed

+135
-53
lines changed

7 files changed

+135
-53
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import static io.serverlessworkflow.impl.WorkflowUtils.*;
19+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
1920

2021
import io.serverlessworkflow.api.types.Input;
2122
import io.serverlessworkflow.api.types.ListenTo;
2223
import io.serverlessworkflow.api.types.Output;
2324
import io.serverlessworkflow.api.types.Schedule;
2425
import io.serverlessworkflow.api.types.Workflow;
2526
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
26-
import io.serverlessworkflow.impl.events.EventRegistrationInfo;
2727
import io.serverlessworkflow.impl.executors.TaskExecutor;
2828
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
2929
import io.serverlessworkflow.impl.resources.ResourceLoader;
@@ -41,11 +41,11 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
4141
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
4242
private Optional<WorkflowFilter> inputFilter = Optional.empty();
4343
private Optional<WorkflowFilter> outputFilter = Optional.empty();
44-
private EventRegistrationInfo registrationInfo;
4544
private final WorkflowApplication application;
4645
private final TaskExecutor<?> taskExecutor;
4746
private final ResourceLoader resourceLoader;
4847
private final Map<String, TaskExecutor<?>> executors = new HashMap<>();
48+
private ScheduledEventConsumer scheculedConsumer;
4949

5050
private WorkflowDefinition(
5151
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
@@ -83,32 +83,18 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
8383
if (schedule != null) {
8484
ListenTo to = schedule.getOn();
8585
if (to != null) {
86-
definition.register(
87-
application.scheduler().eventConsumer(definition, application.modelFactory()::from),
88-
EventRegistrationBuilderInfo.from(application, to, x -> null));
86+
definition.scheculedConsumer =
87+
application
88+
.scheduler()
89+
.eventConsumer(
90+
definition,
91+
application.modelFactory()::from,
92+
EventRegistrationBuilderInfo.from(application, to, x -> null));
8993
}
9094
}
9195
return definition;
9296
}
9397

94-
private void register(ScheduledEventConsumer consumer, EventRegistrationBuilderInfo builderInfo) {
95-
WorkflowModelCollection model = application.modelFactory().createCollection();
96-
registrationInfo =
97-
EventRegistrationInfo.<WorkflowModel>build(
98-
builderInfo.registrations(),
99-
(ce, f) -> consumer.accept(ce, f, model),
100-
application.eventConsumer());
101-
registrationInfo
102-
.completableFuture()
103-
.thenAccept(
104-
x -> {
105-
EventRegistrationInfo prevRegistrationInfo = registrationInfo;
106-
register(consumer, builderInfo);
107-
consumer.start(model);
108-
prevRegistrationInfo.registrations().forEach(application.eventConsumer()::unregister);
109-
});
110-
}
111-
11298
public WorkflowInstance instance(Object input) {
11399
WorkflowModel inputModel = application.modelFactory().fromAny(input);
114100
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
@@ -159,8 +145,6 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
159145

160146
@Override
161147
public void close() {
162-
if (registrationInfo != null) {
163-
registrationInfo.registrations().forEach(application.eventConsumer()::unregister);
164-
}
148+
safeClose(scheculedConsumer);
165149
}
166150
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
1920
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
2021
import java.util.Collection;
2122
import java.util.function.Function;
@@ -24,5 +25,7 @@ public interface WorkflowScheduler {
2425
Collection<WorkflowInstance> scheduledInstances();
2526

2627
ScheduledEventConsumer eventConsumer(
27-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter);
28+
WorkflowDefinition definition,
29+
Function<CloudEvent, WorkflowModel> converter,
30+
EventRegistrationBuilderInfo info);
2831
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
import io.serverlessworkflow.impl.WorkflowInstance;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowScheduler;
23+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
2627
import java.util.function.Function;
2728

2829
public class DefaultWorkflowScheduler implements WorkflowScheduler {
2930

30-
private Collection<WorkflowInstance> instances = new ArrayList<>();
31+
private Collection<WorkflowInstance> instances =
32+
Collections.synchronizedCollection(new ArrayList<>());
3133

3234
@Override
3335
public Collection<WorkflowInstance> scheduledInstances() {
@@ -36,8 +38,10 @@ public Collection<WorkflowInstance> scheduledInstances() {
3638

3739
@Override
3840
public ScheduledEventConsumer eventConsumer(
39-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter) {
40-
return new ScheduledEventConsumer(definition, converter) {
41+
WorkflowDefinition definition,
42+
Function<CloudEvent, WorkflowModel> converter,
43+
EventRegistrationBuilderInfo builderInfo) {
44+
return new ScheduledEventConsumer(definition, converter, builderInfo) {
4145
@Override
4246
protected void addScheduledInstance(WorkflowInstance instance) {
4347
instances.add(instance);

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,105 @@
2020
import io.serverlessworkflow.impl.WorkflowInstance;
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowModelCollection;
23-
import java.util.concurrent.CompletableFuture;
23+
import io.serverlessworkflow.impl.events.EventConsumer;
24+
import io.serverlessworkflow.impl.events.EventRegistration;
25+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
26+
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
27+
import java.util.ArrayList;
28+
import java.util.Collection;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
2432
import java.util.function.Function;
2533

26-
public abstract class ScheduledEventConsumer {
34+
public abstract class ScheduledEventConsumer implements AutoCloseable {
2735

2836
private final Function<CloudEvent, WorkflowModel> converter;
2937
private final WorkflowDefinition definition;
38+
private final EventRegistrationBuilderInfo builderInfo;
39+
private final EventConsumer eventConsumer;
40+
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
41+
private Collection<EventRegistration> registrations = new ArrayList<>();
3042

3143
protected ScheduledEventConsumer(
32-
WorkflowDefinition definition, Function<CloudEvent, WorkflowModel> converter) {
44+
WorkflowDefinition definition,
45+
Function<CloudEvent, WorkflowModel> converter,
46+
EventRegistrationBuilderInfo builderInfo) {
3347
this.definition = definition;
3448
this.converter = converter;
49+
this.builderInfo = builderInfo;
50+
this.eventConsumer = definition.application().eventConsumer();
51+
if (builderInfo.registrations().isAnd()) {
52+
this.correlatedEvents = new HashMap<>();
53+
builderInfo
54+
.registrations()
55+
.registrations()
56+
.forEach(
57+
reg -> {
58+
correlatedEvents.put(reg, new ArrayList<>());
59+
registrations.add(
60+
eventConsumer.register(
61+
reg,
62+
ce -> {
63+
Collection<Collection<CloudEvent>> collections = new ArrayList<>();
64+
// to minimize the critical section, conversion is done later, here we are performing
65+
// just collection, if any
66+
synchronized (correlatedEvents) {
67+
correlatedEvents.get(reg).add((CloudEvent) ce);
68+
while (satisfyCondition()) {
69+
Collection<CloudEvent> collection = new ArrayList<>();
70+
for (List<CloudEvent> values : correlatedEvents.values()) {
71+
collection.add(values.remove(0));
72+
}
73+
collections.add(collection);
74+
}
75+
}
76+
// convert and start outside synchronized
77+
collections.forEach(this::start);
78+
}));
79+
});
80+
} else {
81+
builderInfo
82+
.registrations()
83+
.registrations()
84+
.forEach(
85+
reg -> registrations.add(eventConsumer.register(reg, ce -> start((CloudEvent) ce))));
86+
}
3587
}
3688

37-
public void accept(
38-
CloudEvent t, CompletableFuture<WorkflowModel> u, WorkflowModelCollection col) {
39-
WorkflowModel model = converter.apply(t);
40-
col.add(model);
41-
u.complete(model);
89+
private boolean satisfyCondition() {
90+
for (List<CloudEvent> values : correlatedEvents.values()) {
91+
if (values.isEmpty()) {
92+
return false;
93+
}
94+
}
95+
return true;
4296
}
4397

44-
public void start(Object model) {
98+
protected void start(CloudEvent ce) {
99+
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
100+
model.add(converter.apply(ce));
101+
start(model);
102+
}
103+
104+
protected void start(Collection<CloudEvent> ces) {
105+
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
106+
ces.forEach(ce -> model.add(converter.apply(ce)));
107+
start(model);
108+
}
109+
110+
private void start(WorkflowModel model) {
45111
WorkflowInstance instance = definition.instance(model);
46112
addScheduledInstance(instance);
47113
instance.start();
48114
}
49115

116+
public void close() {
117+
if (correlatedEvents != null) {
118+
correlatedEvents.clear();
119+
}
120+
registrations.forEach(eventConsumer::unregister);
121+
}
122+
50123
protected abstract void addScheduledInstance(WorkflowInstance instace);
51124
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventTest.java renamed to impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,21 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.concurrent.ExecutionException;
35-
import org.junit.jupiter.api.AfterAll;
36-
import org.junit.jupiter.api.BeforeAll;
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
3737
import org.junit.jupiter.api.Test;
3838

39-
class ScheduleEventTest {
39+
class ScheduleEventConsumerTest {
4040

41-
private static WorkflowApplication appl;
41+
private WorkflowApplication appl;
4242

43-
@BeforeAll
44-
static void init() throws IOException {
43+
@BeforeEach
44+
void init() throws IOException {
4545
appl = WorkflowApplication.builder().build();
4646
}
4747

48-
@AfterAll
49-
static void tearDown() throws IOException {
48+
@AfterEach
49+
void tearDown() throws IOException {
5050
appl.close();
5151
}
5252

@@ -65,8 +65,26 @@ void testStartUsingEvent() throws IOException, InterruptedException, ExecutionEx
6565
.atMost(Duration.ofMillis(200))
6666
.until(() -> instances.size() == 2);
6767
List<Object> outputs = instances.stream().map(i -> i.output().asJavaObject()).toList();
68-
assertThat(outputs.get(0)).isEqualTo(Map.of("recovered", "Javierito"));
69-
assertThat(outputs.get(1)).isEqualTo(Map.of("recovered", "Fulanito"));
68+
assertThat(outputs)
69+
.containsExactlyInAnyOrder(
70+
Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito"));
71+
}
72+
73+
@Test
74+
void testStartUsingConsecutiveEvent()
75+
throws IOException, InterruptedException, ExecutionException {
76+
appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml"));
77+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito"))));
78+
appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito"))));
79+
Collection<WorkflowInstance> instances = appl.scheduler().scheduledInstances();
80+
await()
81+
.pollDelay(Duration.ofMillis(25))
82+
.atMost(Duration.ofMillis(500))
83+
.until(() -> instances.stream().filter(i -> i.output() != null).count() == 2);
84+
List<Object> outputs = instances.stream().map(i -> i.output().asJavaObject()).toList();
85+
assertThat(outputs)
86+
.containsExactlyInAnyOrder(
87+
Map.of("recovered", "Javierito"), Map.of("recovered", "Fulanito"));
7088
}
7189

7290
private CloudEvent buildCloudEvent(Object data) {

impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ public class TraceExecutionListener implements WorkflowExecutionListener {
3737
private static final Logger logger = LoggerFactory.getLogger(TraceExecutionListener.class);
3838

3939
public void onWorkflowStarted(WorkflowStartedEvent ev) {
40-
4140
logger.info(
42-
"Workflow definition {} with id {} started at {}",
41+
"Workflow definition {} with id {} started at {} with data {}",
4342
ev.workflowContext().definition().workflow().getDocument().getName(),
4443
ev.workflowContext().instanceData().id(),
45-
ev.eventDate());
44+
ev.eventDate(),
45+
ev.workflowContext().instanceData().input());
4646
}
4747

4848
public void onWorkflowResumed(WorkflowResumedEvent ev) {

impl/test/src/test/resources/workflows-samples/listen-start.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
document:
22
dsl: '1.0.1'
3-
namespace: examples
3+
namespace: test
44
name: event-driven-schedule
55
version: '0.1.0'
66
schedule:

0 commit comments

Comments
 (0)