Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package io.quarkiverse.flow.it;


import io.quarkiverse.flow.Flow;
import io.quarkus.test.junit.QuarkusTest;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowModel;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.util.Objects;
import java.util.function.Function;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http;

@QuarkusTest
public class Call4PapersTest {

@Inject
Call4PapersFlow flow;

@Test
void should_execute_correctly() {
var submission = new Call4PapersFlow.ProposalSubmission(
"Reactive Workflows with Quarkus",
"This paper explores reactive workflow patterns...",
"Jane Developer"
);
WorkflowModel workflowModel = flow.startInstance(submission).await().indefinitely();

Assertions.assertNotNull(workflowModel);
}

@ApplicationScoped
public static class Call4PapersFlow extends Flow {

@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("call4papers")
.tasks(
// Step 1: Validate proposal with inputFrom transformation
function("validateProposal", (Proposal input) -> {
System.out.println("Validating proposal: " + input.title());
return input;
}, Proposal.class)
.inputFrom((ProposalSubmission submission) -> new Proposal(
submission.title(),
submission.proposal(), // Maps to abstractText
submission.author()), ProposalSubmission.class),

// Step 2: Score proposal with outputAs transformation
function("scoreProposal", (Proposal input) -> {
Integer score = calculateScore(input.abstractText());
System.out.println("Score calculated having the result as: " + score);
return score;
}, Proposal.class)
.outputAs((Long score) -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fjtirado the outputAs should extract the Long score from the task output or from the WorkflowContext#context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking the documentation I got this one:

Transform Task Output After completing the task, its output can be transformed before passing it to the next task or storing it in the workflow context. Transformations are applied using the output.as runtime expression.

return new ProposalScore(score, score >= 7);
}, Long.class),

// Step 3: Prepare notification with exportAs using workflow context
function("prepareNotification", proposalScore -> new ProposalScore(proposalScore.score, proposalScore.accepted), ProposalScore.class)
.exportAs((ProposalScore proposalScore, WorkflowContextData workflowContext) -> {
// Access original workflow input to get title and author
WorkflowModel originalInput = workflowContext.instanceData().input();
ProposalSubmission submission = originalInput.as(ProposalSubmission.class).orElseThrow();

// Create enriched payload combining original input with score
return new NotificationPayload(
submission.title(),
submission.author(),
proposalScore.score(),
proposalScore.accepted());
}, ProposalScore.class).inputFrom(Function.identity(), ProposalScore.class),

// Step 4: Send notification via HTTP
http("sendNotification")
.POST()
.body("${.}") // Uses the NotificationPayload from exportAs
.header("Content-Type", "application/json")
.uri(URI.create("http://localhost:9999/notifications")))
.build();
}

/**
* Calculate a score for the proposal based on its abstract.
* In a real implementation, this might use NLP, keyword analysis, etc.
*/
private Integer calculateScore(String abstractText) {
// Simple scoring: longer abstracts get higher scores
int length = abstractText.length();
if (length > 500)
return 9;
if (length > 300)
return 7;
if (length > 150)
return 5;
return 3;
}

// Data types representing different stages of the workflow

/**
* External DTO received from the API
*/
public record ProposalSubmission(String title, String proposal, String author) {
}

/**
* Internal domain model used for processing
*/
public record Proposal(String title, String abstractText, String author) {
}

/**
* Scoring result persisted in workflow data
*/
public record ProposalScore(long score, boolean accepted) {
}

/**
* Final notification payload enriched with data from multiple sources
*/
public record NotificationPayload(String title, String author, long score, boolean accepted) {
}
}
}
108 changes: 108 additions & 0 deletions docs/modules/ROOT/examples/org/acme/dataflow/Call4PapersFlow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.acme.dataflow;

import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http;

import java.net.URI;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkiverse.flow.Flow;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowModel;

@ApplicationScoped
public class Call4PapersFlow extends Flow {

@Override
public Workflow descriptor() {
return FuncWorkflowBuilder.workflow("call4papers")
.tasks(
// Step 1: Validate proposal with inputFrom transformation
function("validateProposal", (Proposal input) -> {
System.out.println("Validating proposal: " + input.title());
return input;
}, Proposal.class)
.inputFrom((ProposalSubmission submission) -> new Proposal(
submission.title(),
submission.proposal(), // Maps to abstractText
submission.author()), ProposalSubmission.class),

// Step 2: Score proposal with outputAs transformation
function("scoreProposal", (Proposal input) -> {
int score = calculateScore(input.abstractText());
return score;
}, Proposal.class)
.outputAs((Integer score) -> new ProposalScore(score, score >= 7),
Integer.class),

// Step 3: Prepare notification with exportAs using workflow context
function("prepareNotification", (ProposalScore proposalScore) -> {
return proposalScore; // Pass through
}, ProposalScore.class)
.exportAs((ProposalScore proposalScore, WorkflowContextData workflowContext) -> {
// Access original workflow input to get title and author
WorkflowModel originalInput = workflowContext.instanceData().input();
ProposalSubmission submission = originalInput.as(ProposalSubmission.class).orElseThrow();

// Create enriched payload combining original input with score
return new NotificationPayload(
submission.title(),
submission.author(),
proposalScore.score(),
proposalScore.accepted());
}, ProposalScore.class),

// Step 4: Send notification via HTTP
http("sendNotification")
.POST()
.body("${.}") // Uses the NotificationPayload from exportAs
.header("Content-Type", "application/json")
.uri(URI.create("http://localhost:9999/notifications")))
.build();
}

/**
* Calculate a score for the proposal based on its abstract.
* In a real implementation, this might use NLP, keyword analysis, etc.
*/
private int calculateScore(String abstractText) {
// Simple scoring: longer abstracts get higher scores
int length = abstractText.length();
if (length > 500)
return 9;
if (length > 300)
return 7;
if (length > 150)
return 5;
return 3;
}

// Data types representing different stages of the workflow

/**
* External DTO received from the API
*/
public record ProposalSubmission(String title, String proposal, String author) {
}

/**
* Internal domain model used for processing
*/
public record Proposal(String title, String abstractText, String author) {
}

/**
* Scoring result persisted in workflow data
*/
public record ProposalScore(int score, boolean accepted) {
}

/**
* Final notification payload enriched with data from multiple sources
*/
public record NotificationPayload(String title, String author, int score, boolean accepted) {
}
}
2 changes: 2 additions & 0 deletions docs/modules/ROOT/pages/data-flow.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ Here you can:
This is handy for logging, debugging, or adding metadata without polluting your
business types.

[[export-as]]
=== 2.2 `exportAs` – pass data to the next step

`exportAs` controls what the task **exports** for downstream consumers (the next
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exportAs controls what is set in the shared workflow context, I think this description is kind of confusing
outputAs control what is set for the next task.

Expand Down Expand Up @@ -266,6 +267,7 @@ Good when:

You can freely mix JQ and Java across tasks in the same workflow.

[[quick-reference]]
== 5. Quick reference

* `inputFrom(String jq)` – slice workflow data using JQ.
Expand Down
Loading
Loading