Skip to content

Commit f4dcbdb

Browse files
fjtiradomatheusandre1
authored andcommitted
Review comments
Signed-off-by: Matheus André <matheusandr2@gmail.com>
1 parent 4ffacf0 commit f4dcbdb

8 files changed

Lines changed: 100 additions & 51 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,6 @@ public interface WorkflowInstance extends WorkflowInstanceData {
5252
boolean resume();
5353

5454
<T> T addMetadataIfAbsent(String key, Supplier<T> supplier);
55+
56+
void removeMetadataIfPresent(String key);
5557
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,9 @@ public <T> T addMetadataIfAbsent(String key, Supplier<T> supplier) {
355355
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
356356
}
357357

358-
public Object computeCorrelationValue(String key, Object value) {
359-
return additionalObjects.computeIfAbsent(key, k -> value);
358+
@Override
359+
public void removeMetadataIfPresent(String key) {
360+
additionalObjects.remove(key);
360361
}
361362

362363
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.serverlessworkflow.impl.WorkflowApplication;
2525
import io.serverlessworkflow.impl.WorkflowContext;
2626
import io.serverlessworkflow.impl.WorkflowModel;
27-
import io.serverlessworkflow.impl.WorkflowModelFactory;
2827
import java.util.AbstractCollection;
2928
import java.util.ArrayList;
3029
import java.util.Collection;
@@ -42,6 +41,8 @@ public abstract class AbstractTypeConsumer
4241

4342
private static final Logger logger = LoggerFactory.getLogger(AbstractTypeConsumer.class);
4443

44+
private static final CloudEventPredicate ALWAYS_TRUE = (ce, wf, t) -> true;
45+
4546
protected abstract void registerToAll(Consumer<CloudEvent> consumer);
4647

4748
protected abstract void unregisterFromAll();
@@ -61,8 +62,7 @@ public TypeEventRegistrationBuilder listen(
6162
application.cloudEventPredicateFactory().build(application, properties);
6263
Collection<CloudEventPredicate> correlationPredicates =
6364
buildCorrelationPredicates(register.getCorrelate(), application);
64-
return new TypeEventRegistrationBuilder(
65-
type, cePredicate, correlationPredicates, application.modelFactory());
65+
return new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
6666
}
6767

6868
private Collection<CloudEventPredicate> buildCorrelationPredicates(
@@ -83,43 +83,42 @@ private Collection<CloudEventPredicate> buildCorrelationPredicates(
8383

8484
@Override
8585
public Collection<TypeEventRegistrationBuilder> listenToAll(WorkflowApplication application) {
86-
return List.of(
87-
new TypeEventRegistrationBuilder(null, null, List.of(), application.modelFactory()));
86+
return List.of(new TypeEventRegistrationBuilder(null, ALWAYS_TRUE, List.of()));
8887
}
8988

9089
private static class CloudEventConsumer extends AbstractCollection<TypeEventRegistration>
9190
implements Consumer<CloudEvent> {
92-
private final WorkflowModelFactory modelFactory;
9391
private Collection<TypeEventRegistration> registrations = new CopyOnWriteArrayList<>();
9492

95-
CloudEventConsumer(WorkflowModelFactory modelFactory) {
96-
this.modelFactory = modelFactory;
97-
}
98-
9993
@Override
10094
public void accept(CloudEvent ce) {
10195
logger.debug("Received cloud event {}", ce);
96+
WorkflowModel eventModel = null;
10297
for (TypeEventRegistration registration : registrations) {
103-
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
104-
if (!testCorrelation(ce, registration)) {
98+
if (!registration.predicate().test(ce, registration.workflow(), registration.task())) {
99+
continue;
100+
}
101+
Collection<CloudEventPredicate> correlationPredicates =
102+
registration.correlationPredicates();
103+
if (!correlationPredicates.isEmpty()) {
104+
if (eventModel == null
105+
&& correlationPredicates.stream()
106+
.anyMatch(ModelAwareCloudEventPredicate.class::isInstance)) {
107+
eventModel = registration.workflow().definition().application().modelFactory().from(ce);
108+
}
109+
if (!testCorrelation(ce, registration, eventModel)) {
105110
continue;
106111
}
107-
registration.consumer().accept(ce);
108112
}
113+
registration.consumer().accept(ce);
109114
}
110115
}
111116

112-
private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registration) {
117+
private boolean testCorrelation(
118+
CloudEvent ce, TypeEventRegistration registration, WorkflowModel eventModel) {
113119
Collection<CloudEventPredicate> predicates = registration.correlationPredicates();
114-
if (predicates.isEmpty()) {
115-
return true;
116-
}
117-
WorkflowModel eventModel = null;
118120
for (CloudEventPredicate pred : predicates) {
119121
if (pred instanceof ModelAwareCloudEventPredicate ma) {
120-
if (eventModel == null) {
121-
eventModel = modelFactory.from(ce);
122-
}
123122
if (!ma.test(eventModel, registration.workflow(), registration.task())) {
124123
return false;
125124
}
@@ -161,7 +160,7 @@ public TypeEventRegistration register(
161160
TaskContext task) {
162161
if (builder.type() == null) {
163162
registerToAll(ce);
164-
return new TypeEventRegistration(null, ce, null, workflow, task);
163+
return new TypeEventRegistration(null, ce, ALWAYS_TRUE, workflow, task);
165164
} else {
166165
TypeEventRegistration registration =
167166
new TypeEventRegistration(
@@ -175,7 +174,7 @@ public TypeEventRegistration register(
175174
.computeIfAbsent(
176175
registration.type(),
177176
k -> {
178-
CloudEventConsumer consumer = new CloudEventConsumer(builder.modelFactory());
177+
CloudEventConsumer consumer = new CloudEventConsumer();
179178
register(k, consumer);
180179
return consumer;
181180
})
@@ -201,5 +200,17 @@ public void unregister(TypeEventRegistration registration) {
201200
}
202201
});
203202
}
203+
cleanupCorrelationState(registration);
204+
}
205+
206+
private void cleanupCorrelationState(TypeEventRegistration registration) {
207+
for (CloudEventPredicate pred : registration.correlationPredicates()) {
208+
if (pred instanceof CorrelationPredicate cp) {
209+
String key = cp.stateKey(registration.task());
210+
if (key != null) {
211+
registration.workflow().instance().removeMetadataIfPresent(key);
212+
}
213+
}
214+
}
204215
}
205216
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/CorrelationPredicate.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,16 @@ public static CorrelationPredicate from(
5656
}
5757

5858
private String correlationStateKey(TaskContext task) {
59-
return "correlation:" + task.position().jsonPointer() + ":" + correlationKey;
59+
return "correlation:"
60+
+ task.position().jsonPointer()
61+
+ ":"
62+
+ task.iteration()
63+
+ ":"
64+
+ correlationKey;
65+
}
66+
67+
String stateKey(TaskContext task) {
68+
return expectResolver == null ? correlationStateKey(task) : null;
6069
}
6170

6271
@Override
@@ -75,7 +84,7 @@ public boolean test(WorkflowModel eventModel, WorkflowContext workflow, TaskCont
7584

7685
if (expectResolver == null) {
7786
String stateKey = correlationStateKey(task);
78-
Object firstValue = workflow.instance().computeCorrelationValue(stateKey, eventValue);
87+
Object firstValue = workflow.instance().addMetadataIfAbsent(stateKey, () -> eventValue);
7988
boolean result = Objects.equals(eventValue, firstValue);
8089
logger.debug(
8190
"Correlation no expect, eventValue='{}', firstValue='{}', match={}",

impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,16 @@
1515
*/
1616
package io.serverlessworkflow.impl.events;
1717

18-
import io.serverlessworkflow.impl.WorkflowModelFactory;
1918
import java.util.Collection;
2019
import java.util.Collections;
2120

2221
public record TypeEventRegistrationBuilder(
2322
String type,
2423
CloudEventPredicate cePredicate,
25-
Collection<CloudEventPredicate> correlationPredicates,
26-
WorkflowModelFactory modelFactory)
24+
Collection<CloudEventPredicate> correlationPredicates)
2725
implements EventRegistrationBuilder {
2826

2927
public TypeEventRegistrationBuilder(String type, CloudEventPredicate cePredicate) {
30-
this(type, cePredicate, Collections.emptyList(), null);
28+
this(type, cePredicate, Collections.emptyList());
3129
}
3230
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.stream.Stream;
4343
import org.junit.jupiter.api.AfterAll;
4444
import org.junit.jupiter.api.BeforeAll;
45-
import org.junit.jupiter.api.Test;
4645
import org.junit.jupiter.params.ParameterizedTest;
4746
import org.junit.jupiter.params.provider.Arguments;
4847
import org.junit.jupiter.params.provider.MethodSource;
@@ -83,8 +82,11 @@ void testCorrelateMatch(String sourceName, Workflow workflow) throws Exception {
8382
Map.of("patientId", "P123", "name", "John"))));
8483

8584
WorkflowModel result = future.get(2, TimeUnit.SECONDS);
86-
List<Object> output = (List<Object>) JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
85+
Object outputValue = JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
86+
assertThat(outputValue).isInstanceOf(List.class);
87+
List<?> output = (List<?>) outputValue;
8788
assertThat(output).hasSize(1);
89+
assertThat(output.get(0)).isInstanceOf(Map.class);
8890
Map<String, Object> eventData = (Map<String, Object>) output.get(0);
8991
assertThat(eventData).containsEntry("patientId", "P123");
9092
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
@@ -96,11 +98,6 @@ void testCorrelateNoMatch(String sourceName, Workflow workflow) throws Exception
9698
assertCorrelateNoMatch(workflow);
9799
}
98100

99-
@Test
100-
void testCorrelateNoMatchDsl() throws Exception {
101-
assertCorrelateNoMatch(listenCorrelateWorkflow());
102-
}
103-
104101
private static Stream<Arguments> correlateWorkflowSources() throws IOException {
105102
return Stream.of(
106103
readWorkflowFromClasspath("workflows-samples/listen-correlate.yaml"),
@@ -110,7 +107,7 @@ private static Stream<Arguments> correlateWorkflowSources() throws IOException {
110107

111108
private static Workflow listenCorrelateWorkflow() {
112109
return WorkflowBuilder.workflow("listen-correlate-java-dsl", "test", "0.1.0")
113-
.input(i -> i.from("{ patientId: .patientId }"))
110+
.input(i -> i.from("{ id: .patientId }"))
114111
.tasks(
115112
doTasks(
116113
listen(
@@ -127,9 +124,7 @@ private static Workflow listenCorrelateWorkflow() {
127124
"com.example.hospital.patient.admitted"))
128125
.correlate(
129126
"patientId",
130-
cp ->
131-
cp.from(".data.patientId")
132-
.expect(".patientId")))))))
127+
cp -> cp.from(".data.patientId").expect(".id")))))))
133128
.build();
134129
}
135130

@@ -192,9 +187,16 @@ private static Workflow listenCorrelateNoExpectWorkflow() {
192187
.build();
193188
}
194189

195-
@Test
196-
void testCorrelateNoExpectMatch() throws Exception {
197-
Workflow workflow = listenCorrelateNoExpectWorkflow();
190+
private static Stream<Arguments> correlateNoExpectWorkflowSources() throws IOException {
191+
return Stream.of(
192+
readWorkflowFromClasspath("workflows-samples/listen-correlate-no-expect.yaml"),
193+
listenCorrelateNoExpectWorkflow())
194+
.map(wf -> Arguments.of(wf.getDocument().getName(), wf));
195+
}
196+
197+
@ParameterizedTest(name = "{0}")
198+
@MethodSource("correlateNoExpectWorkflowSources")
199+
void testCorrelateNoExpectMatch(String sourceName, Workflow workflow) throws Exception {
198200
WorkflowDefinition def = appl.workflowDefinition(workflow);
199201
WorkflowInstance instance = def.instance();
200202
CompletableFuture<WorkflowModel> future = instance.start();
@@ -213,16 +215,23 @@ void testCorrelateNoExpectMatch() throws Exception {
213215
Map.of("patientId", "P123", "name", "John"))));
214216

215217
WorkflowModel result = future.get(2, TimeUnit.SECONDS);
216-
List<Object> output = (List<Object>) JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
218+
Object outputValue = JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
219+
assertThat(outputValue).isInstanceOf(List.class);
220+
List<?> output = (List<?>) outputValue;
217221
assertThat(output).hasSize(1);
222+
assertThat(output.get(0)).isInstanceOf(Map.class);
218223
Map<String, Object> eventData = (Map<String, Object>) output.get(0);
219224
assertThat(eventData).containsEntry("patientId", "P123");
220225
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
221226
}
222227

223-
@Test
224-
void testCorrelateNoExpectNoMatch() throws Exception {
225-
Workflow workflow = listenCorrelateNoExpectWorkflow();
228+
@ParameterizedTest(name = "{0}")
229+
@MethodSource("correlateNoExpectWorkflowSources")
230+
void testCorrelateNoExpectNoMatch(String sourceName, Workflow workflow) throws Exception {
231+
assertCorrelateNoExpectNoMatch(workflow);
232+
}
233+
234+
private void assertCorrelateNoExpectNoMatch(Workflow workflow) throws Exception {
226235
WorkflowDefinition def = appl.workflowDefinition(workflow);
227236
WorkflowInstance instance = def.instance();
228237
CompletableFuture<WorkflowModel> future = instance.start();
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-correlate-no-expect
5+
version: '0.1.0'
6+
do:
7+
- waitForPatient:
8+
listen:
9+
to:
10+
one:
11+
with:
12+
type: com.example.hospital.patient.admitted
13+
correlate:
14+
patientId:
15+
from: .data.patientId

impl/test/src/test/resources/workflows-samples/listen-correlate.yaml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ document:
44
name: listen-correlate
55
version: '0.1.0'
66
input:
7-
from: '{ patientId: .patientId }'
7+
# Raw input shape: { "patientId": "P123" }
8+
# Transforms to: { "id": "P123" } — reshapes patientId into id field
9+
from: '{ id: .patientId }'
810
do:
911
- waitForPatient:
1012
listen:
@@ -15,4 +17,6 @@ do:
1517
correlate:
1618
patientId:
1719
from: .data.patientId
18-
expect: .patientId
20+
# expect is evaluated against the task input (post input.from transform),
21+
# so .id resolves to the reshaped id field in the transformed input
22+
expect: .id

0 commit comments

Comments
 (0)