Skip to content

Commit 2b5babd

Browse files
fix: WorkflowRegistry is not fetching cacheDescriptor (#301)
1 parent 8a80a48 commit 2b5babd

File tree

4 files changed

+83
-7
lines changed

4 files changed

+83
-7
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,4 @@ pom.xml.tag
6262
pom.xml.releaseBackup
6363
pom.xml.versionsBackup
6464
release.properties
65+
.m2/
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.quarkiverse.flow.deployment.test.devui;
2+
3+
import jakarta.enterprise.context.ApplicationScoped;
4+
import jakarta.enterprise.event.Observes;
5+
6+
import io.quarkiverse.flow.internal.WorkflowRegistry;
7+
import io.quarkus.runtime.StartupEvent;
8+
import io.serverlessworkflow.api.types.Workflow;
9+
import io.serverlessworkflow.fluent.spec.WorkflowBuilder;
10+
11+
@ApplicationScoped
12+
public class CachedDevUIDescriptorObserver {
13+
14+
void onStart(@Observes StartupEvent event, WorkflowRegistry registry) {
15+
Workflow descriptor = WorkflowBuilder.workflow("cached-devui-workflow")
16+
.tasks(t -> t.set("cachedTask", """
17+
{ "message": "cached" }
18+
"""))
19+
.build();
20+
21+
registry.cacheDescriptor(descriptor);
22+
}
23+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.quarkiverse.flow.deployment.test.devui;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import org.jboss.shrinkwrap.api.ShrinkWrap;
6+
import org.jboss.shrinkwrap.api.spec.JavaArchive;
7+
import org.junit.jupiter.api.Test;
8+
import org.junit.jupiter.api.extension.RegisterExtension;
9+
10+
import com.fasterxml.jackson.databind.JsonNode;
11+
12+
import io.quarkus.devui.tests.DevUIJsonRPCTest;
13+
import io.quarkus.test.QuarkusDevModeTest;
14+
15+
public class FlowCachedDescriptorDevUIJsonRPCTest extends DevUIJsonRPCTest {
16+
17+
@RegisterExtension
18+
static final QuarkusDevModeTest devMode = new QuarkusDevModeTest()
19+
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
20+
.addClasses(DevUIWorkflow.class, CachedDevUIDescriptorObserver.class));
21+
22+
public FlowCachedDescriptorDevUIJsonRPCTest() {
23+
super("quarkus-flow");
24+
}
25+
26+
@Test
27+
void shouldListOnlyDefinitionsFromDevUIBackend() throws Exception {
28+
JsonNode node = super.executeJsonRPCMethod("getWorkflows");
29+
assertEquals(2, node.size());
30+
}
31+
}

core/runtime/src/main/java/io/quarkiverse/flow/internal/WorkflowRegistry.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.quarkiverse.flow.internal;
22

33
import java.util.Collection;
4+
import java.util.LinkedHashMap;
45
import java.util.List;
56
import java.util.Map;
67
import java.util.Optional;
@@ -39,18 +40,26 @@ public class WorkflowRegistry {
3940
@Inject
4041
WorkflowApplication app;
4142

42-
private Map<WorkflowDefinitionId, Workflow> agenticCache = new ConcurrentHashMap<>();
43+
private Map<WorkflowDefinitionId, Workflow> descriptorCache = new ConcurrentHashMap<>();
4344

4445
public static WorkflowRegistry current() {
4546
return Arc.container().instance(WorkflowRegistry.class).get();
4647
}
4748

4849
public Collection<Workflow> all() {
49-
return app.workflowDefinitions().values().stream().map(WorkflowDefinition::workflow).toList();
50+
Map<WorkflowDefinitionId, Workflow> all = new LinkedHashMap<>(allDefinitionsMap());
51+
descriptorCache.forEach(all::putIfAbsent);
52+
return all.values();
53+
}
54+
55+
private Map<WorkflowDefinitionId, Workflow> allDefinitionsMap() {
56+
Map<WorkflowDefinitionId, Workflow> all = new LinkedHashMap<>();
57+
app.workflowDefinitions().forEach((id, definition) -> all.put(id, definition.workflow()));
58+
return all;
5059
}
5160

5261
public int count() {
53-
return app.workflowDefinitions().size();
62+
return all().size();
5463
}
5564

5665
public Optional<WorkflowDefinition> lookup(WorkflowDefinitionId id) {
@@ -59,17 +68,22 @@ public Optional<WorkflowDefinition> lookup(WorkflowDefinitionId id) {
5968

6069
public Optional<Workflow> lookupDescriptor(WorkflowDefinitionId id) {
6170
Optional<Workflow> workflow = lookup(id).map(WorkflowDefinition::workflow);
62-
return workflow.isPresent() ? workflow : Optional.ofNullable(agenticCache.get(id));
71+
return workflow.isPresent() ? workflow : Optional.ofNullable(descriptorCache.get(id));
6372
}
6473

6574
public WorkflowDefinition register(Flowable flowable) {
6675
LOG.info("Registering workflow {}", flowable.descriptor().getDocument().getName());
67-
return app.workflowDefinition(addFlowableMetadata(flowable));
76+
Workflow workflow = addFlowableMetadata(flowable);
77+
WorkflowDefinition definition = app.workflowDefinition(workflow);
78+
invalidateCachedDescriptor(definition);
79+
return definition;
6880
}
6981

7082
public WorkflowDefinition register(Workflow workflow) {
7183
LOG.info("Registering workflow {}", workflow.getDocument().getName());
72-
return app.workflowDefinition(workflow);
84+
WorkflowDefinition definition = app.workflowDefinition(workflow);
85+
invalidateCachedDescriptor(definition);
86+
return definition;
7387
}
7488

7589
private Workflow addFlowableMetadata(final Flowable flowable) {
@@ -98,6 +112,13 @@ void warmUp() {
98112

99113
public void cacheDescriptor(Workflow workflow) {
100114
LOG.debug("Caching workflow descriptor for {}", workflow.getDocument().getName());
101-
agenticCache.put(WorkflowDefinitionId.of(workflow), workflow);
115+
descriptorCache.put(WorkflowDefinitionId.of(workflow), workflow);
116+
}
117+
118+
private void invalidateCachedDescriptor(WorkflowDefinition definition) {
119+
WorkflowDefinitionId id = definition.id();
120+
if (descriptorCache.remove(id) != null) {
121+
LOG.debug("Invalidating cached workflow descriptor for {}", id.name());
122+
}
102123
}
103124
}

0 commit comments

Comments
 (0)