From 364d17129dee235e9868997b340ad13a222d37ad Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Tue, 3 Mar 2026 13:38:09 -0300 Subject: [PATCH] WIP: Add initial version of the how-to guide Signed-off-by: Matheus Cruz --- .../quarkiverse/flow/it/Call4PapersTest.java | 134 +++++ .../org/acme/dataflow/Call4PapersFlow.java | 108 ++++ docs/modules/ROOT/pages/data-flow.adoc | 2 + .../ROOT/pages/how-to-deal-with-state.adoc | 474 ++++++++++++++++++ 4 files changed, 718 insertions(+) create mode 100644 core/integration-tests/src/test/java/io/quarkiverse/flow/it/Call4PapersTest.java create mode 100644 docs/modules/ROOT/examples/org/acme/dataflow/Call4PapersFlow.java create mode 100644 docs/modules/ROOT/pages/how-to-deal-with-state.adoc diff --git a/core/integration-tests/src/test/java/io/quarkiverse/flow/it/Call4PapersTest.java b/core/integration-tests/src/test/java/io/quarkiverse/flow/it/Call4PapersTest.java new file mode 100644 index 00000000..cb9c8c10 --- /dev/null +++ b/core/integration-tests/src/test/java/io/quarkiverse/flow/it/Call4PapersTest.java @@ -0,0 +1,134 @@ +package io.quarkiverse.flow.it; + + +import io.quarkiverse.flow.Flow; +import io.quarkus.test.junit.QuarkusTest; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowModel; +import io.smallrye.mutiny.Uni; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.Objects; +import java.util.function.Function; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http; + +@QuarkusTest +public class Call4PapersTest { + + @Inject + Call4PapersFlow flow; + + @Test + void should_execute_correctly() { + var submission = new Call4PapersFlow.ProposalSubmission( + "Reactive Workflows with Quarkus", + "This paper explores reactive workflow patterns...", + "Jane Developer" + ); + WorkflowModel workflowModel = flow.startInstance(submission).await().indefinitely(); + + Assertions.assertNotNull(workflowModel); + } + + @ApplicationScoped + public static class Call4PapersFlow extends Flow { + + @Override + public Workflow descriptor() { + return FuncWorkflowBuilder.workflow("call4papers") + .tasks( + // Step 1: Validate proposal with inputFrom transformation + function("validateProposal", (Proposal input) -> { + System.out.println("Validating proposal: " + input.title()); + return input; + }, Proposal.class) + .inputFrom((ProposalSubmission submission) -> new Proposal( + submission.title(), + submission.proposal(), // Maps to abstractText + submission.author()), ProposalSubmission.class), + + // Step 2: Score proposal with outputAs transformation + function("scoreProposal", (Proposal input) -> { + Integer score = calculateScore(input.abstractText()); + System.out.println("Score calculated having the result as: " + score); + return score; + }, Proposal.class) + .outputAs((Long score) -> { + return new ProposalScore(score, score >= 7); + }, Long.class), + + // Step 3: Prepare notification with exportAs using workflow context + function("prepareNotification", proposalScore -> new ProposalScore(proposalScore.score, proposalScore.accepted), ProposalScore.class) + .exportAs((ProposalScore proposalScore, WorkflowContextData workflowContext) -> { + // Access original workflow input to get title and author + WorkflowModel originalInput = workflowContext.instanceData().input(); + ProposalSubmission submission = originalInput.as(ProposalSubmission.class).orElseThrow(); + + // Create enriched payload combining original input with score + return new NotificationPayload( + submission.title(), + submission.author(), + proposalScore.score(), + proposalScore.accepted()); + }, ProposalScore.class).inputFrom(Function.identity(), ProposalScore.class), + + // Step 4: Send notification via HTTP + http("sendNotification") + .POST() + .body("${.}") // Uses the NotificationPayload from exportAs + .header("Content-Type", "application/json") + .uri(URI.create("http://localhost:9999/notifications"))) + .build(); + } + + /** + * Calculate a score for the proposal based on its abstract. + * In a real implementation, this might use NLP, keyword analysis, etc. + */ + private Integer calculateScore(String abstractText) { + // Simple scoring: longer abstracts get higher scores + int length = abstractText.length(); + if (length > 500) + return 9; + if (length > 300) + return 7; + if (length > 150) + return 5; + return 3; + } + + // Data types representing different stages of the workflow + + /** + * External DTO received from the API + */ + public record ProposalSubmission(String title, String proposal, String author) { + } + + /** + * Internal domain model used for processing + */ + public record Proposal(String title, String abstractText, String author) { + } + + /** + * Scoring result persisted in workflow data + */ + public record ProposalScore(long score, boolean accepted) { + } + + /** + * Final notification payload enriched with data from multiple sources + */ + public record NotificationPayload(String title, String author, long score, boolean accepted) { + } + } +} diff --git a/docs/modules/ROOT/examples/org/acme/dataflow/Call4PapersFlow.java b/docs/modules/ROOT/examples/org/acme/dataflow/Call4PapersFlow.java new file mode 100644 index 00000000..8a1a70c4 --- /dev/null +++ b/docs/modules/ROOT/examples/org/acme/dataflow/Call4PapersFlow.java @@ -0,0 +1,108 @@ +package org.acme.dataflow; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http; + +import java.net.URI; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.quarkiverse.flow.Flow; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowModel; + +@ApplicationScoped +public class Call4PapersFlow extends Flow { + + @Override + public Workflow descriptor() { + return FuncWorkflowBuilder.workflow("call4papers") + .tasks( + // Step 1: Validate proposal with inputFrom transformation + function("validateProposal", (Proposal input) -> { + System.out.println("Validating proposal: " + input.title()); + return input; + }, Proposal.class) + .inputFrom((ProposalSubmission submission) -> new Proposal( + submission.title(), + submission.proposal(), // Maps to abstractText + submission.author()), ProposalSubmission.class), + + // Step 2: Score proposal with outputAs transformation + function("scoreProposal", (Proposal input) -> { + int score = calculateScore(input.abstractText()); + return score; + }, Proposal.class) + .outputAs((Integer score) -> new ProposalScore(score, score >= 7), + Integer.class), + + // Step 3: Prepare notification with exportAs using workflow context + function("prepareNotification", (ProposalScore proposalScore) -> { + return proposalScore; // Pass through + }, ProposalScore.class) + .exportAs((ProposalScore proposalScore, WorkflowContextData workflowContext) -> { + // Access original workflow input to get title and author + WorkflowModel originalInput = workflowContext.instanceData().input(); + ProposalSubmission submission = originalInput.as(ProposalSubmission.class).orElseThrow(); + + // Create enriched payload combining original input with score + return new NotificationPayload( + submission.title(), + submission.author(), + proposalScore.score(), + proposalScore.accepted()); + }, ProposalScore.class), + + // Step 4: Send notification via HTTP + http("sendNotification") + .POST() + .body("${.}") // Uses the NotificationPayload from exportAs + .header("Content-Type", "application/json") + .uri(URI.create("http://localhost:9999/notifications"))) + .build(); + } + + /** + * Calculate a score for the proposal based on its abstract. + * In a real implementation, this might use NLP, keyword analysis, etc. + */ + private int calculateScore(String abstractText) { + // Simple scoring: longer abstracts get higher scores + int length = abstractText.length(); + if (length > 500) + return 9; + if (length > 300) + return 7; + if (length > 150) + return 5; + return 3; + } + + // Data types representing different stages of the workflow + + /** + * External DTO received from the API + */ + public record ProposalSubmission(String title, String proposal, String author) { + } + + /** + * Internal domain model used for processing + */ + public record Proposal(String title, String abstractText, String author) { + } + + /** + * Scoring result persisted in workflow data + */ + public record ProposalScore(int score, boolean accepted) { + } + + /** + * Final notification payload enriched with data from multiple sources + */ + public record NotificationPayload(String title, String author, int score, boolean accepted) { + } +} diff --git a/docs/modules/ROOT/pages/data-flow.adoc b/docs/modules/ROOT/pages/data-flow.adoc index ce7c9cea..66f33b04 100644 --- a/docs/modules/ROOT/pages/data-flow.adoc +++ b/docs/modules/ROOT/pages/data-flow.adoc @@ -116,6 +116,7 @@ Here you can: This is handy for logging, debugging, or adding metadata without polluting your business types. +[[export-as]] === 2.2 `exportAs` – pass data to the next step `exportAs` controls what the task **exports** for downstream consumers (the next @@ -266,6 +267,7 @@ Good when: You can freely mix JQ and Java across tasks in the same workflow. +[[quick-reference]] == 5. Quick reference * `inputFrom(String jq)` – slice workflow data using JQ. diff --git a/docs/modules/ROOT/pages/how-to-deal-with-state.adoc b/docs/modules/ROOT/pages/how-to-deal-with-state.adoc new file mode 100644 index 00000000..bb7f545d --- /dev/null +++ b/docs/modules/ROOT/pages/how-to-deal-with-state.adoc @@ -0,0 +1,474 @@ += How to transform data between workflow tasks + +include::includes/attributes.adoc[] +:page-role: howto +:sectnums: + +This guide shows you how to control data flow between tasks in a workflow using `inputFrom`, `exportAs`, and `outputAs` transformations. + +== What you'll build + +You'll create a conference paper submission workflow that: + +1. Receives a paper proposal submission +2. Validates and scores the proposal +3. Enriches the data with submission metadata +4. Sends a notification with the review result + +This demonstrates the three key data transformation methods: + +* `inputFrom` – to shape what each task receives as input +* `exportAs` – to pass enriched data between tasks without modifying workflow state +* `outputAs` – to control what gets persisted in the workflow data + +[TIP] +==== +For conceptual background on data flow, see xref:data-flow.adoc[Data Flow and Transformations]. +For a quick syntax reference, see xref:data-flow.adoc#quick-reference[Quick Reference]. +==== + +== Prerequisites + +* A Quarkus Flow project (see xref:getting-started.adoc[Getting Started]) +* Basic understanding of Java records and functional programming + +== Step 1: Define your data types + +Create the records that represent data at different stages of the workflow: + +[source,java] +---- +public record ProposalSubmission(String title, String proposal, String author) {} // <1> + +public record Proposal(String title, String abstractText, String author) {} // <2> + +public record ProposalScore(int score, boolean accepted) {} // <3> + +public record NotificationPayload(String title, String author, int score, boolean accepted) {} // <4> +---- +<1> The external data received from the API +<2> The internal domain model used for processing +<3> The scoring result +<4> The final notification payload + +== Step 2: Create the workflow with inputFrom + +Start by creating a workflow that transforms the input submission into your domain model: + +[source,java] +---- +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; + +@ApplicationScoped +public class Call4PapersFlow extends Flow { + + @Override + public Workflow descriptor() { + return FuncWorkflowBuilder.workflow("call4papers") + .tasks( + function("validateProposal", (Proposal input) -> { // <1> + System.out.println("Validating proposal: " + input.title()); + return input; + }, Proposal.class) + .inputFrom((ProposalSubmission submission) -> // <2> + new Proposal( + submission.title(), + submission.proposal(), // <3> + submission.author() + ), ProposalSubmission.class) // <4> + ) + .build(); + } + + public record ProposalSubmission(String title, String proposal, String author) {} + public record Proposal(String title, String abstractText, String author) {} +} +---- +<1> The task expects a `Proposal` as input +<2> Use `inputFrom` to transform the workflow input +<3> Map `proposal` field to `abstractText` field +<4> Specify the input type for the transformation + +[TIP] +==== +`inputFrom` isolates tasks from changes in the workflow data structure. The task only sees what it needs. +==== + +== Step 3: Add scoring with outputAs + +Add a task that scores the proposal and uses `outputAs` to control what gets saved to workflow state: + +[source,java] +---- +function("validateProposal", (Proposal input) -> { + System.out.println("Validating proposal: " + input.title()); + return input; +}, Proposal.class) + .inputFrom((ProposalSubmission submission) -> + new Proposal(submission.title(), submission.proposal(), submission.author()), + ProposalSubmission.class), + +function("scoreProposal", (Proposal input) -> { // <1> + int score = calculateScore(input.abstractText()); + return score; +}, Proposal.class) + .outputAs((Integer score) -> // <2> + new ProposalScore(score, score >= 7), // <3> + Integer.class) // <4> +---- +<1> Task returns a simple integer score +<2> Use `outputAs` to transform the result before saving to workflow data +<3> Create a richer object with both score and acceptance status +<4> Specify the input type (what the task returns) + +[IMPORTANT] +==== +`outputAs` determines what gets persisted in the workflow data document. This is what subsequent tasks can access from the workflow context. +==== + +== Step 4: Enrich data with exportAs and context + +Add a separate task that prepares the notification by enriching the score with data from the original workflow input: + +[source,java] +---- +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowModel; + +function("scoreProposal", (Proposal input) -> { + int score = calculateScore(input.abstractText()); + return score; +}, Proposal.class) + .outputAs((Integer score) -> + new ProposalScore(score, score >= 7), + Integer.class), + +function("prepareNotification", (ProposalScore proposalScore) -> { // <1> + return proposalScore; // Pass through +}, ProposalScore.class) + .exportAs((ProposalScore proposalScore, WorkflowContextData workflowContext) -> { // <2> + // Access the original workflow input + WorkflowModel originalInput = workflowContext.instanceData().input(); // <3> + ProposalSubmission submission = originalInput.as(ProposalSubmission.class).orElseThrow(); // <4> + + // Create enriched payload for next task + return new NotificationPayload( // <5> + submission.title(), + submission.author(), + proposalScore.score(), + proposalScore.accepted() + ); + }, ProposalScore.class), + +http("sendNotification") // <6> + .POST() + .body("${.}") // <7> + .header("Content-Type", "application/json") + .uri(URI.create("http://localhost:9999/notifications")) +---- +<1> New task receives the ProposalScore from previous task +<2> Use `exportAs` with workflow context to access original input +<3> Get the original workflow input (ProposalSubmission) +<4> Cast to the expected type with proper error handling +<5> Create enriched payload combining data from multiple sources +<6> HTTP task receives the enriched NotificationPayload +<7> The body will be the NotificationPayload as JSON + +[NOTE] +==== +`exportAs` shapes data for the next task without modifying the workflow data document. It's perfect for: + +* Enriching payloads with context from earlier in the workflow +* Preparing data for external APIs +* Passing technical metadata (tokens, correlation IDs) between tasks +==== + +== Step 5: Complete example + +Here's the complete workflow with all transformations: + +[source,java] +---- +include::{examples-dir}org/acme/dataflow/Call4PapersFlow.java[] +---- + +== Testing the workflow + +Start a workflow instance with a submission: + +[source,java] +---- +@Inject +Call4PapersFlow flow; + +public void submitProposal() { + var submission = new ProposalSubmission( + "Reactive Workflows with Quarkus", + "This paper explores reactive workflow patterns...", + "Jane Developer" + ); + + flow.startInstance(submission).await().indefinitely(); +} +---- + +The workflow will: + +1. Transform `ProposalSubmission` → `Proposal` (via `inputFrom`) +2. Score the proposal and save `ProposalScore` to workflow data (via `outputAs`) +3. Prepare notification by enriching score with original submission data (via `exportAs`) +4. Send HTTP notification with the enriched payload + +== Understanding the data flow + +At each stage, different data is available: + +[cols="1,2,2"] +|=== +|Stage |Workflow Data |Task Input + +|After `validateProposal` +|`Proposal` object +|`Proposal` (from `inputFrom`) + +|After `scoreProposal` +|`ProposalScore` object +|`Proposal` (from previous task) + +|After `prepareNotification` +|`ProposalScore` object (unchanged) +|`ProposalScore` (from previous task) + +|In `sendNotification` +|`ProposalScore` object +|`NotificationPayload` (from `exportAs`) +|=== + +== Advanced: Combining typed task output with workflow context + +The previous examples showed basic transformations. Now let's explore advanced patterns where you combine the typed task result with workflow context data. + +=== Using JavaContextFunction for type-safe access + +The `outputAs(JavaContextFunction function, Class argClass)` overload gives you: + +* `T` – The typed task result (e.g., `ProposalScore`) +* `WorkflowContextData` – Access to workflow input and metadata +* `V` – The return type (what gets saved to workflow data) + +[source,java] +---- +function("scoreProposal", (Proposal input) -> { + int score = calculateScore(input.abstractText()); + return score; +}, Proposal.class) + .outputAs((Integer score, WorkflowContextData workflowContext) -> { // <1> + // Access the original workflow input with type safety + WorkflowModel originalInput = workflowContext.instanceData().input(); + ProposalSubmission submission = originalInput.as(ProposalSubmission.class).orElseThrow(); // <2> + + // Combine task result with workflow input + return new EnrichedProposalScore( // <3> + submission.title(), + submission.author(), + score, + score >= 7, + java.time.Instant.now() + ); + }, Integer.class) // <4> + +public record EnrichedProposalScore( + String title, + String author, + int score, + boolean accepted, + Instant scoredAt +) {} +---- +<1> Use `JavaContextFunction` to access both the typed score and workflow context +<2> Type-safe access to the original workflow input +<3> Create a new object combining task result with workflow input data +<4> Specify the task result type (Integer) + +[TIP] +==== +This pattern is useful when you need to enrich the task result with data from the original workflow input without passing it through every task. +==== + +=== Using JavaFilterFunction for full context access + +The `outputAs(JavaFilterFunction function, Class argClass)` overload provides even more context: + +* `T` – The typed task result +* `WorkflowContextData` – Workflow-level context +* `TaskContextData` – Task-level context (input, raw output, position) +* `V` – The return type + +[source,java] +---- +import io.serverlessworkflow.impl.TaskContextData; + +function("scoreProposal", (Proposal input) -> { + int score = calculateScore(input.abstractText()); + return score; +}, Proposal.class) + .outputAs((Integer score, // <1> + WorkflowContextData workflowContext, // <2> + TaskContextData taskContext) -> { // <3> + + // Access original workflow input + WorkflowModel originalInput = workflowContext.instanceData().input(); + ProposalSubmission submission = originalInput.as(ProposalSubmission.class).orElseThrow(); + + // Access task-specific context + var taskInput = taskContext.input().as(Proposal.class).orElseThrow(); // <4> + String taskPosition = taskContext.position().jsonPointer(); // <5> + + // Create enriched result with metadata + return new DetailedProposalScore( + submission.title(), + submission.author(), + taskInput.abstractText().length(), // From task input + score, + score >= 7, + taskPosition, // e.g., "/do/1/task" + java.time.Instant.now() + ); + }, Integer.class) + +public record DetailedProposalScore( + String title, + String author, + int abstractLength, + int score, + boolean accepted, + String evaluatedAt, // Task position in workflow + Instant timestamp +) {} +---- +<1> Typed task result (Integer score) +<2> Workflow context for accessing original input +<3> Task context for accessing task-specific data +<4> Access what this specific task received as input (after `inputFrom`) +<5> Get the task's position in the workflow structure + +[IMPORTANT] +==== +`JavaFilterFunction` gives you the most complete picture: + +* **Task result** – What the function returned +* **Task input** – What the task received (after `inputFrom`) +* **Workflow input** – The original data passed to `startInstance()` +* **Task metadata** – Position, name, raw output +==== + +=== Type-safe vs dynamic access + +You can choose between type-safe and dynamic access patterns: + +[source,java] +---- +// Type-safe: Recommended for domain objects +.outputAs((Integer score, WorkflowContextData wf) -> { + ProposalSubmission submission = wf.instanceData() + .input() + .as(ProposalSubmission.class) + .orElseThrow(); // <1> + return new EnrichedScore(submission.title(), score); +}, Integer.class) + +// Dynamic: Useful for flexible/generic workflows +.outputAs((Integer score, WorkflowContextData wf) -> { + var inputMap = wf.instanceData() + .input() + .asMap() + .orElseThrow(); // <2> + return Map.of( + "title", inputMap.get("title"), + "score", score + ); +}, Integer.class) +---- +<1> Type-safe: Fails fast if structure doesn't match +<2> Dynamic: More flexible but loses compile-time safety + +[TIP] +==== +**When to use each approach:** + +* **Type-safe** (`as(MyClass.class)`) – For well-defined domain models, better IDE support, compile-time safety +* **Dynamic** (`asMap()`) – For generic workflows, external data, or when structure varies +==== + +=== Practical example: Audit trail + +Here's a real-world example combining everything: + +[source,java] +---- +function("scoreProposal", (Proposal input) -> { + int score = calculateScore(input.abstractText()); + return score; +}, Proposal.class) + .outputAs((Integer score, + WorkflowContextData workflowContext, + TaskContextData taskContext) -> { + + // Get original submission + ProposalSubmission submission = workflowContext.instanceData() + .input() + .as(ProposalSubmission.class) + .orElseThrow(); + + // Get workflow instance ID for audit + String instanceId = workflowContext.instanceData().id(); + + // Create audit trail + return new ScoredProposal( + instanceId, + submission.title(), + submission.author(), + score, + score >= 7, + taskContext.position().jsonPointer(), + java.time.Instant.now() + ); + }, Integer.class) + +public record ScoredProposal( + String workflowInstanceId, + String title, + String author, + int score, + boolean accepted, + String evaluatedBy, // Task position + Instant evaluatedAt +) {} +---- + +This creates a complete audit record combining: + +* Workflow instance tracking (instanceId) +* Original submission data (title, author) +* Evaluation result (score, accepted) +* Evaluation metadata (where and when) + +== Key takeaways + +* Use **`inputFrom`** to give tasks a focused, typed view of the data they need +* Use **`outputAs`** to control what gets persisted in the workflow state +* Use **`exportAs`** to enrich data for the next task without modifying workflow state +* Access workflow context in transformations to combine data from multiple sources + +== Next steps + +* Learn about context-aware transformations with `JavaFilterFunction` in xref:data-flow.adoc[Data Flow and Transformations] +* Explore JQ expressions as an alternative to Java functions +* See xref:data-flow.adoc#quick-reference[Quick Reference] for all transformation method signatures + +== Related guides + +* xref:data-flow.adoc[Data Flow and Transformations] – Conceptual overview +* xref:dsl-cheatsheet.adoc[Java DSL Cheatsheet] – Quick syntax reference +* xref:test-debug.adoc[Testing and Debugging] – How to test workflows \ No newline at end of file