diff --git a/.gitignore b/.gitignore index a19c3c0c..26c6f2b5 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,4 @@ pom.xml.tag pom.xml.releaseBackup pom.xml.versionsBackup release.properties +.m2/ diff --git a/core/deployment/src/test/java/io/quarkiverse/flow/deployment/test/devui/CachedDevUIDescriptorObserver.java b/core/deployment/src/test/java/io/quarkiverse/flow/deployment/test/devui/CachedDevUIDescriptorObserver.java new file mode 100644 index 00000000..67d31870 --- /dev/null +++ b/core/deployment/src/test/java/io/quarkiverse/flow/deployment/test/devui/CachedDevUIDescriptorObserver.java @@ -0,0 +1,23 @@ +package io.quarkiverse.flow.deployment.test.devui; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; + +import io.quarkiverse.flow.internal.WorkflowRegistry; +import io.quarkus.runtime.StartupEvent; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; + +@ApplicationScoped +public class CachedDevUIDescriptorObserver { + + void onStart(@Observes StartupEvent event, WorkflowRegistry registry) { + Workflow descriptor = WorkflowBuilder.workflow("cached-devui-workflow") + .tasks(t -> t.set("cachedTask", """ + { "message": "cached" } + """)) + .build(); + + registry.cacheDescriptor(descriptor); + } +} diff --git a/core/deployment/src/test/java/io/quarkiverse/flow/deployment/test/devui/FlowCachedDescriptorDevUIJsonRPCTest.java b/core/deployment/src/test/java/io/quarkiverse/flow/deployment/test/devui/FlowCachedDescriptorDevUIJsonRPCTest.java new file mode 100644 index 00000000..495e90c2 --- /dev/null +++ b/core/deployment/src/test/java/io/quarkiverse/flow/deployment/test/devui/FlowCachedDescriptorDevUIJsonRPCTest.java @@ -0,0 +1,31 @@ +package io.quarkiverse.flow.deployment.test.devui; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.databind.JsonNode; + +import io.quarkus.devui.tests.DevUIJsonRPCTest; +import io.quarkus.test.QuarkusDevModeTest; + +public class FlowCachedDescriptorDevUIJsonRPCTest extends DevUIJsonRPCTest { + + @RegisterExtension + static final QuarkusDevModeTest devMode = new QuarkusDevModeTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(DevUIWorkflow.class, CachedDevUIDescriptorObserver.class)); + + public FlowCachedDescriptorDevUIJsonRPCTest() { + super("quarkus-flow"); + } + + @Test + void shouldListOnlyDefinitionsFromDevUIBackend() throws Exception { + JsonNode node = super.executeJsonRPCMethod("getWorkflows"); + assertEquals(2, node.size()); + } +} diff --git a/core/runtime/src/main/java/io/quarkiverse/flow/internal/WorkflowRegistry.java b/core/runtime/src/main/java/io/quarkiverse/flow/internal/WorkflowRegistry.java index 6cee19ab..1a200b32 100644 --- a/core/runtime/src/main/java/io/quarkiverse/flow/internal/WorkflowRegistry.java +++ b/core/runtime/src/main/java/io/quarkiverse/flow/internal/WorkflowRegistry.java @@ -1,6 +1,7 @@ package io.quarkiverse.flow.internal; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -39,18 +40,26 @@ public class WorkflowRegistry { @Inject WorkflowApplication app; - private Map agenticCache = new ConcurrentHashMap<>(); + private Map descriptorCache = new ConcurrentHashMap<>(); public static WorkflowRegistry current() { return Arc.container().instance(WorkflowRegistry.class).get(); } public Collection all() { - return app.workflowDefinitions().values().stream().map(WorkflowDefinition::workflow).toList(); + Map all = new LinkedHashMap<>(allDefinitionsMap()); + descriptorCache.forEach(all::putIfAbsent); + return all.values(); + } + + private Map allDefinitionsMap() { + Map all = new LinkedHashMap<>(); + app.workflowDefinitions().forEach((id, definition) -> all.put(id, definition.workflow())); + return all; } public int count() { - return app.workflowDefinitions().size(); + return all().size(); } public Optional lookup(WorkflowDefinitionId id) { @@ -59,17 +68,22 @@ public Optional lookup(WorkflowDefinitionId id) { public Optional lookupDescriptor(WorkflowDefinitionId id) { Optional workflow = lookup(id).map(WorkflowDefinition::workflow); - return workflow.isPresent() ? workflow : Optional.ofNullable(agenticCache.get(id)); + return workflow.isPresent() ? workflow : Optional.ofNullable(descriptorCache.get(id)); } public WorkflowDefinition register(Flowable flowable) { LOG.info("Registering workflow {}", flowable.descriptor().getDocument().getName()); - return app.workflowDefinition(addFlowableMetadata(flowable)); + Workflow workflow = addFlowableMetadata(flowable); + WorkflowDefinition definition = app.workflowDefinition(workflow); + invalidateCachedDescriptor(definition); + return definition; } public WorkflowDefinition register(Workflow workflow) { LOG.info("Registering workflow {}", workflow.getDocument().getName()); - return app.workflowDefinition(workflow); + WorkflowDefinition definition = app.workflowDefinition(workflow); + invalidateCachedDescriptor(definition); + return definition; } private Workflow addFlowableMetadata(final Flowable flowable) { @@ -98,6 +112,13 @@ void warmUp() { public void cacheDescriptor(Workflow workflow) { LOG.debug("Caching workflow descriptor for {}", workflow.getDocument().getName()); - agenticCache.put(WorkflowDefinitionId.of(workflow), workflow); + descriptorCache.put(WorkflowDefinitionId.of(workflow), workflow); + } + + private void invalidateCachedDescriptor(WorkflowDefinition definition) { + WorkflowDefinitionId id = definition.id(); + if (descriptorCache.remove(id) != null) { + LOG.debug("Invalidating cached workflow descriptor for {}", id.name()); + } } }