Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
release.properties
.m2/
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkiverse.flow.deployment.test.devui;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

import io.quarkiverse.flow.internal.WorkflowRegistry;
import io.quarkus.runtime.StartupEvent;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.spec.WorkflowBuilder;

@ApplicationScoped
public class CachedDevUIDescriptorObserver {

void onStart(@Observes StartupEvent event, WorkflowRegistry registry) {
Workflow descriptor = WorkflowBuilder.workflow("cached-devui-workflow")
.tasks(t -> t.set("cachedTask", """
{ "message": "cached" }
"""))
.build();

registry.cacheDescriptor(descriptor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.quarkiverse.flow.deployment.test.devui;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.fasterxml.jackson.databind.JsonNode;

import io.quarkus.devui.tests.DevUIJsonRPCTest;
import io.quarkus.test.QuarkusDevModeTest;

public class FlowCachedDescriptorDevUIJsonRPCTest extends DevUIJsonRPCTest {

@RegisterExtension
static final QuarkusDevModeTest devMode = new QuarkusDevModeTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(DevUIWorkflow.class, CachedDevUIDescriptorObserver.class));

public FlowCachedDescriptorDevUIJsonRPCTest() {
super("quarkus-flow");
}

@Test
void shouldListOnlyDefinitionsFromDevUIBackend() throws Exception {
JsonNode node = super.executeJsonRPCMethod("getWorkflows");
assertEquals(2, node.size());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkiverse.flow.internal;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -39,18 +40,26 @@ public class WorkflowRegistry {
@Inject
WorkflowApplication app;

private Map<WorkflowDefinitionId, Workflow> agenticCache = new ConcurrentHashMap<>();
private Map<WorkflowDefinitionId, Workflow> descriptorCache = new ConcurrentHashMap<>();

public static WorkflowRegistry current() {
return Arc.container().instance(WorkflowRegistry.class).get();
}

public Collection<Workflow> all() {
return app.workflowDefinitions().values().stream().map(WorkflowDefinition::workflow).toList();
Map<WorkflowDefinitionId, Workflow> all = new LinkedHashMap<>(allDefinitionsMap());
descriptorCache.forEach(all::putIfAbsent);
return all.values();
}

private Map<WorkflowDefinitionId, Workflow> allDefinitionsMap() {
Map<WorkflowDefinitionId, Workflow> all = new LinkedHashMap<>();
app.workflowDefinitions().forEach((id, definition) -> all.put(id, definition.workflow()));
return all;
}

public int count() {
return app.workflowDefinitions().size();
return all().size();
}

public Optional<WorkflowDefinition> lookup(WorkflowDefinitionId id) {
Expand All @@ -59,17 +68,22 @@ public Optional<WorkflowDefinition> lookup(WorkflowDefinitionId id) {

public Optional<Workflow> lookupDescriptor(WorkflowDefinitionId id) {
Optional<Workflow> workflow = lookup(id).map(WorkflowDefinition::workflow);
return workflow.isPresent() ? workflow : Optional.ofNullable(agenticCache.get(id));
return workflow.isPresent() ? workflow : Optional.ofNullable(descriptorCache.get(id));
}

public WorkflowDefinition register(Flowable flowable) {
LOG.info("Registering workflow {}", flowable.descriptor().getDocument().getName());
return app.workflowDefinition(addFlowableMetadata(flowable));
Workflow workflow = addFlowableMetadata(flowable);
WorkflowDefinition definition = app.workflowDefinition(workflow);
invalidateCachedDescriptor(definition);
return definition;
}

public WorkflowDefinition register(Workflow workflow) {
LOG.info("Registering workflow {}", workflow.getDocument().getName());
return app.workflowDefinition(workflow);
WorkflowDefinition definition = app.workflowDefinition(workflow);
invalidateCachedDescriptor(definition);
return definition;
}

private Workflow addFlowableMetadata(final Flowable flowable) {
Expand Down Expand Up @@ -98,6 +112,13 @@ void warmUp() {

public void cacheDescriptor(Workflow workflow) {
LOG.debug("Caching workflow descriptor for {}", workflow.getDocument().getName());
agenticCache.put(WorkflowDefinitionId.of(workflow), workflow);
descriptorCache.put(WorkflowDefinitionId.of(workflow), workflow);
}

private void invalidateCachedDescriptor(WorkflowDefinition definition) {
WorkflowDefinitionId id = definition.id();
if (descriptorCache.remove(id) != null) {
LOG.debug("Invalidating cached workflow descriptor for {}", id.name());
}
}
}
Loading