diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index e18f646a..949e9192 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -25,17 +25,6 @@ jobs: uses: actions/setup-java@v2 with: java-version: '11' - distribution: 'microsoft' - - - name: set JDK_11 environment variable test compiling and running - env: - ACTIONS_ALLOW_UNSECURE_COMMANDS: true - run: echo ::set-env name=JDK_11::$(echo $JAVA_HOME) - - - name: Set up JDK 8 - uses: actions/setup-java@v2 - with: - java-version: '8' distribution: 'temurin' - name: Setup Gradle @@ -57,7 +46,6 @@ jobs: - name: Run Unit Tests with Gradle run: | - export JAVA_HOME=$JDK_11 ./gradlew clean test || echo "UNIT_TEST_FAILED=true" >> $GITHUB_ENV continue-on-error: true diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fd898fa7..66140126 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,5 +1,52 @@ +Verify Compilation: +```shell +./gradlew compileJava +``` + Build & test: ```shell ./gradlew build -``` \ No newline at end of file +``` + +## Verify Build Components + +To ensure all build components work correctly after making changes: + +```shell +# Build without tests +./gradlew build -x test + +# Run unit tests +./gradlew test + +# Run SpotBugs +./gradlew spotbugsMain spotbugsTest +``` + +## Updating Protobuf Definitions + +When updating the protobuf definitions in `internal/durabletask-protobuf/protos/orchestrator_service.proto`: + +1. Manually copy the updated protobuf file from dapr/durabletask-protobuf +2. Update the commit hash in `internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH` to reflect the new commit +3. Regenerate the Java classes from the protobuf definitions: + +```shell +./gradlew generateProto +``` + +## Test locally from dapr/java-sdk + +```shell +./gradlew publishToMavenLocal +``` + +or simply `./gradlew build publishToMavenLocal` + +Check if it was released locally with: +```shell +ls ~/.m2/repository/io/dapr/durabletask-client/ +``` + +Then update the durabletask-client in the java-sdk pom.xml file to the newest version you released locally. \ No newline at end of file diff --git a/README.md b/README.md index c816022f..e3e59edd 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,15 @@ # Durable Task Client SDK for Java -[![Build](https://github.com/microsoft/durabletask-java/actions/workflows/build-validation.yml/badge.svg)](https://github.com/microsoft/durabletask-java/actions/workflows/build-validation.yml) +[![Build](https://github.com/dapr/durabletask-java/actions/workflows/build-validation.yml/badge.svg)](https://github.com/dapr/durabletask-java/actions/workflows/build-validation.yml) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) This repo contains the Java SDK for the Durable Task Framework as well as classes and annotations to support running [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview?tabs=java) for Java. With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Java code. +## Requirements + +- **Java 11 or higher** - This SDK requires Java 11 as the minimum version +- **Gradle** - The project uses [Gradle 7.6.4](gradle/wrapper/gradle-wrapper.properties) + ### Simple, fault-tolerant sequences ```java @@ -78,8 +83,7 @@ The following packages are produced from this repo. | Package | Latest version | | - | - | -| Durable Task - Client | [![Maven Central](https://img.shields.io/maven-central/v/com.microsoft/durabletask-client?label=durabletask-client)](https://mvnrepository.com/artifact/com.microsoft/durabletask-client/1.0.0) | -| Durable Task - Azure Functions | [![Maven Central](https://img.shields.io/maven-central/v/com.microsoft/durabletask-azure-functions?label=durabletask-azure-functions)](https://mvnrepository.com/artifact/com.microsoft/durabletask-azure-functions/1.0.1) | +| Durable Task - Client | [![Maven Central](https://img.shields.io/maven-central/v/io.dapr/durabletask-client?label=durabletask-client)](https://mvnrepository.com/artifact/io.dapr/durabletask-client/1.5.7) | ## Getting started with Azure Functions diff --git a/azurefunctions/README.md b/azurefunctions/README.md index 132daa00..43ff0ffd 100644 --- a/azurefunctions/README.md +++ b/azurefunctions/README.md @@ -6,7 +6,7 @@ In this article, you follow steps to create and run a simple azure durable funct The following are the requirements for you local environment: -- The Java Developer Kit, version 8 or 11, is required. Between the two, JDK 11 is recommended. +- The Java Developer Kit, version 11 or higher, is required. - The `JAVA_HOME` environment variable must be set to the install location of the correct version of the JDK. - [Apache Maven](https://maven.apache.org/), version 3.0 or above for azure function app creation, is required for using automatic project creation tools. - If Maven isn't your preferred development tool, check out our similar tutorials to [create a function app](https://docs.microsoft.com/en-us/azure/azure-functions/create-first-function-cli-java?tabs=bash%2Cazure-cli%2Cbrowser). This README also contains instructions for [Gradle](https://gradle.org/). diff --git a/client/build.gradle b/client/build.gradle index f4494020..bab0b189 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -11,22 +11,20 @@ plugins { } group 'io.dapr' -version = '1.5.7' +version = '1.5.8' archivesBaseName = 'durabletask-client' def grpcVersion = '1.69.0' def protocVersion = '3.25.5' def jacksonVersion = '2.15.3' -// When build on local, you need to set this value to your local jdk11 directory. -// Java11 is used to compile and run all the tests. -// Example for Windows: C:/Program Files/Java/openjdk-11.0.12_7/ -def PATH_TO_TEST_JAVA_RUNTIME = System.env.JDK_11 ?: System.getProperty("java.home") +// Java 11 is now the minimum required version for both compilation and testing dependencies { // https://github.com/grpc/grpc-java#download implementation "io.grpc:grpc-protobuf:${grpcVersion}" implementation "io.grpc:grpc-stub:${grpcVersion}" + implementation 'com.google.protobuf:protobuf-java:3.25.5' runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}" compileOnly "org.apache.tomcat:annotations-api:6.0.53" @@ -44,12 +42,12 @@ dependencies { // Netty dependencies for TLS implementation "io.grpc:grpc-netty-shaded:${grpcVersion}" - implementation "io.netty:netty-handler:4.1.94.Final" + implementation 'io.netty:netty-handler:4.1.119.Final' implementation "io.netty:netty-tcnative-boringssl-static:2.0.59.Final" // Add Netty dependencies to test classpath testImplementation "io.grpc:grpc-netty-shaded:${grpcVersion}" - testImplementation "io.netty:netty-handler:4.1.94.Final" + testImplementation 'io.netty:netty-handler:4.1.119.Final' testImplementation "io.netty:netty-tcnative-boringssl-static:2.0.59.Final" testImplementation 'org.bouncycastle:bcprov-jdk15on:1.70' @@ -57,14 +55,12 @@ dependencies { } compileJava { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } compileTestJava { sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 - options.fork = true - options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/javac" } task downloadProtoFiles { @@ -72,21 +68,33 @@ task downloadProtoFiles { doLast { def protoDir = file("${rootProject.projectDir}/internal/durabletask-protobuf/protos") + def protoFile = new File(protoDir, 'orchestrator_service.proto') + def commitHashFile = new File("${rootProject.projectDir}/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH") protoDir.mkdirs() // Download the proto file new URL("https://raw.githubusercontent.com/dapr/durabletask-protobuf/${ext.branch}/protos/orchestrator_service.proto") .withInputStream { i -> - new File(protoDir, 'orchestrator_service.proto').withOutputStream { it << i } + protoFile.withOutputStream { it << i } } - - // Get and save the commit hash - def commitHashFile = new File("${rootProject.projectDir}/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH") - def commitApiUrl = new URL("https://api.github.com/repos/dapr/durabletask-protobuf/commits?path=protos/orchestrator_service.proto&sha=${ext.branch}&per_page=1") - def connection = commitApiUrl.openConnection() - connection.setRequestProperty('Accept', 'application/vnd.github.v3+json') - def commitHash = new groovy.json.JsonSlurper().parse(connection.inputStream)[0].sha - commitHashFile.text = commitHash + + try { + def commitApiUrl = new URL("https://api.github.com/repos/dapr/durabletask-protobuf/commits?path=protos/orchestrator_service.proto&sha=${ext.branch}&per_page=1") + def connection = commitApiUrl.openConnection() + connection.setRequestProperty('Accept', 'application/vnd.github.v3+json') + connection.setRequestProperty('User-Agent', 'durabletask-java-build') + + // Add GitHub token if available (automatically provided in GitHub Actions) + if (System.env.GITHUB_TOKEN) { + connection.setRequestProperty('Authorization', "token ${System.env.GITHUB_TOKEN}") + } + + def commitHash = new groovy.json.JsonSlurper().parse(connection.inputStream)[0].sha + commitHashFile.text = commitHash + logger.info("Successfully updated proto commit hash: ${commitHash}") + } catch (Exception e) { + logger.error("Failed to fetch commit hash from GitHub API: ${e.message}") + } } } @@ -115,10 +123,6 @@ sourceSets { } } -tasks.withType(Test) { - executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", 'bin/java') -} - test { useJUnitPlatform { // Skip tests tagged as "integration" since those are slower @@ -208,6 +212,10 @@ java { withJavadocJar() } +tasks.named('sourcesJar').configure { + dependsOn tasks.named('generateProto') +} + spotbugs { toolVersion = '4.9.2' effort = 'max' diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index d1f3361e..d9e3bc65 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -36,6 +36,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final DataConverter dataConverter; private final Duration maximumTimerInterval; private final ExecutorService workerPool; + private final String appId; // App ID for cross-app routing private final TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; @@ -45,6 +46,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { this.orchestrationFactories.putAll(builder.orchestrationFactories); this.activityFactories.putAll(builder.activityFactories); + this.appId = builder.appId; Channel sidecarGrpcChannel; if (builder.channel != null) { @@ -128,7 +130,8 @@ public void startAndBlock() { this.orchestrationFactories, this.dataConverter, this.maximumTimerInterval, - logger); + logger, + this.appId); TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor( this.activityFactories, this.dataConverter, @@ -143,6 +146,9 @@ public void startAndBlock() { RequestCase requestType = workItem.getRequestCase(); if (requestType == RequestCase.ORCHESTRATORREQUEST) { OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); + logger.log(Level.FINEST, + String.format("Processing orchestrator request for instance: {0}", + orchestratorRequest.getInstanceId())); // TODO: Error handling this.workerPool.submit(() -> { @@ -159,6 +165,9 @@ public void startAndBlock() { try { this.sidecarClient.completeOrchestratorTask(response); + logger.log(Level.FINEST, + "Completed orchestrator request for instance: {0}", + orchestratorRequest.getInstanceId()); } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { logger.log(Level.WARNING, @@ -177,7 +186,12 @@ public void startAndBlock() { }); } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); + logger.log(Level.FINEST, + String.format("Processing activity request: %s for instance: %s}", + activityRequest.getName(), + activityRequest.getOrchestrationInstance().getInstanceId())); + // TODO: Error handling this.workerPool.submit(() -> { String output = null; TaskFailureDetails failureDetails = null; @@ -228,7 +242,8 @@ public void startAndBlock() { } else if (requestType == RequestCase.HEALTHPING) { // No-op } else { - logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", + logger.log(Level.WARNING, + "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType); } } diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java index e183648e..8ef12866 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -1,137 +1,153 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package io.dapr.durabletask; - -import io.grpc.Channel; - -import java.time.Duration; -import java.util.HashMap; -import java.util.concurrent.ExecutorService; - - -/** - * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. - */ -public final class DurableTaskGrpcWorkerBuilder { - final HashMap orchestrationFactories = new HashMap<>(); - final HashMap activityFactories = new HashMap<>(); - int port; - Channel channel; - DataConverter dataConverter; - Duration maximumTimerInterval; - ExecutorService executorService; - - /** - * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. - * - * @param factory an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker} - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) { - String key = factory.getName(); - if (key == null || key.length() == 0) { - throw new IllegalArgumentException("A non-empty task orchestration name is required."); - } - - if (this.orchestrationFactories.containsKey(key)) { - throw new IllegalArgumentException( - String.format("A task orchestration factory named %s is already registered.", key)); - } - - this.orchestrationFactories.put(key, factory); - return this; - } - - /** - * Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}. - * - * @param factory an activity factory to be used by the constructed {@link DurableTaskGrpcWorker} - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) { - // TODO: Input validation - String key = factory.getName(); - if (key == null || key.length() == 0) { - throw new IllegalArgumentException("A non-empty task activity name is required."); - } - - if (this.activityFactories.containsKey(key)) { - throw new IllegalArgumentException( - String.format("A task activity factory named %s is already registered.", key)); - } - - this.activityFactories.put(key, factory); - return this; - } - - /** - * Sets the gRPC channel to use for communicating with the sidecar process. - *

- * This builder method allows you to provide your own gRPC channel for communicating with the Durable Task sidecar - * endpoint. Channels provided using this method won't be closed when the worker is closed. - * Rather, the caller remains responsible for shutting down the channel after disposing the worker. - *

- * If not specified, a gRPC channel will be created automatically for each constructed - * {@link DurableTaskGrpcWorker}. - * - * @param channel the gRPC channel to use - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder grpcChannel(Channel channel) { - this.channel = channel; - return this; - } - - /** - * Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used. - * - * @param port the gRPC endpoint port to connect to - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder port(int port) { - this.port = port; - return this; - } - - /** - * Sets the {@link DataConverter} to use for converting serializable data payloads. - * - * @param dataConverter the {@link DataConverter} to use for converting serializable data payloads - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) { - this.dataConverter = dataConverter; - return this; - } - - /** - * Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used. - * The default maximum timer interval duration is 3 days. - * - * @param maximumTimerInterval the maximum timer interval - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) { - this.maximumTimerInterval = maximumTimerInterval; - return this; - } - - /** - * Sets the executor service that will be used to execute threads. - * - * @param executorService {@link ExecutorService}. - * @return this builder object. - */ - public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) { - this.executorService = executorService; - return this; - } - - /** - * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. - * @return a new {@link DurableTaskGrpcWorker} object - */ - public DurableTaskGrpcWorker build() { - return new DurableTaskGrpcWorker(this); - } -} +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package io.dapr.durabletask; + +import io.grpc.Channel; + +import java.time.Duration; +import java.util.HashMap; +import java.util.concurrent.ExecutorService; + + +/** + * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. + */ +public final class DurableTaskGrpcWorkerBuilder { + final HashMap orchestrationFactories = new HashMap<>(); + final HashMap activityFactories = new HashMap<>(); + int port; + Channel channel; + DataConverter dataConverter; + Duration maximumTimerInterval; + ExecutorService executorService; + String appId; // App ID for cross-app routing + + /** + * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. + * + * @param factory an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker} + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) { + String key = factory.getName(); + if (key == null || key.length() == 0) { + throw new IllegalArgumentException("A non-empty task orchestration name is required."); + } + + if (this.orchestrationFactories.containsKey(key)) { + throw new IllegalArgumentException( + String.format("A task orchestration factory named %s is already registered.", key)); + } + + this.orchestrationFactories.put(key, factory); + return this; + } + + /** + * Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}. + * + * @param factory an activity factory to be used by the constructed {@link DurableTaskGrpcWorker} + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) { + // TODO: Input validation + String key = factory.getName(); + if (key == null || key.length() == 0) { + throw new IllegalArgumentException("A non-empty task activity name is required."); + } + + if (this.activityFactories.containsKey(key)) { + throw new IllegalArgumentException( + String.format("A task activity factory named %s is already registered.", key)); + } + + this.activityFactories.put(key, factory); + return this; + } + + /** + * Sets the gRPC channel to use for communicating with the sidecar process. + *

+ * This builder method allows you to provide your own gRPC channel for communicating with the Durable Task sidecar + * endpoint. Channels provided using this method won't be closed when the worker is closed. + * Rather, the caller remains responsible for shutting down the channel after disposing the worker. + *

+ * If not specified, a gRPC channel will be created automatically for each constructed + * {@link DurableTaskGrpcWorker}. + * + * @param channel the gRPC channel to use + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder grpcChannel(Channel channel) { + this.channel = channel; + return this; + } + + /** + * Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used. + * + * @param port the gRPC endpoint port to connect to + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder port(int port) { + this.port = port; + return this; + } + + /** + * Sets the {@link DataConverter} to use for converting serializable data payloads. + * + * @param dataConverter the {@link DataConverter} to use for converting serializable data payloads + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) { + this.dataConverter = dataConverter; + return this; + } + + /** + * Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used. + * The default maximum timer interval duration is 3 days. + * + * @param maximumTimerInterval the maximum timer interval + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) { + this.maximumTimerInterval = maximumTimerInterval; + return this; + } + + /** + * Sets the executor service that will be used to execute threads. + * + * @param executorService {@link ExecutorService}. + * @return this builder object. + */ + public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + /** + * Sets the app ID for cross-app workflow routing. + *

+ * This app ID is used to identify this worker in cross-app routing scenarios. + * It should match the app ID configured in the Dapr sidecar. + *

+ * + * @param appId the app ID for this worker + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder appId(String appId) { + this.appId = appId; + return this; + } + + /** + * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. + * @return a new {@link DurableTaskGrpcWorker} object + */ + public DurableTaskGrpcWorker build() { + return new DurableTaskGrpcWorker(this); + } +} diff --git a/client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java b/client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java index 53c97396..bb861af9 100644 --- a/client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java +++ b/client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java @@ -12,6 +12,7 @@ public final class NewOrchestrationInstanceOptions { private String instanceId; private Object input; private Instant startTime; + private String appID; // Target app ID for cross-app workflow routing /** * Default constructor for the {@link NewOrchestrationInstanceOptions} class. @@ -71,6 +72,17 @@ public NewOrchestrationInstanceOptions setStartTime(Instant startTime) { return this; } + /** + * Sets the target app ID for cross-app workflow routing. + * + * @param appID the target app ID for cross-app routing + * @return this {@link NewOrchestrationInstanceOptions} object + */ + public NewOrchestrationInstanceOptions setAppID(String appID) { + this.appID = appID; + return this; + } + /** * Gets the user-specified version of the new orchestration. * @@ -106,4 +118,22 @@ public Object getInput() { public Instant getStartTime() { return this.startTime; } + + /** + * Gets the configured target app ID for cross-app workflow routing. + * + * @return the configured target app ID + */ + public String getAppID() { + return this.appID; + } + + /** + * Checks if an app ID is configured for cross-app routing. + * + * @return true if an app ID is configured, false otherwise + */ + public boolean hasAppID() { + return this.appID != null && !this.appID.isEmpty(); + } } diff --git a/client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java b/client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java index de6530f9..4f327c2a 100644 --- a/client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java +++ b/client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java @@ -129,7 +129,8 @@ public TaskOrchestration create() { orchestrationFactories, new JacksonDataConverter(), DEFAULT_MAXIMUM_TIMER_INTERVAL, - logger); + logger, + null); // No app ID for static runner // TODO: Error handling TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( diff --git a/client/src/main/java/io/dapr/durabletask/TaskOptions.java b/client/src/main/java/io/dapr/durabletask/TaskOptions.java index 50f47273..79ff542f 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOptions.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOptions.java @@ -8,26 +8,55 @@ public final class TaskOptions { private final RetryPolicy retryPolicy; private final RetryHandler retryHandler; + private final String appID; - public TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler) { + private TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler, String appID) { this.retryPolicy = retryPolicy; this.retryHandler = retryHandler; + this.appID = appID; + } + + /** + * Creates a new builder for {@code TaskOptions}. + * @return a new builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Creates a new {@code TaskOptions} object with default values. + * @return a new TaskOptions instance with no configuration + */ + public static TaskOptions create() { + return new Builder().build(); } /** * Creates a new {@code TaskOptions} object from a {@link RetryPolicy}. * @param retryPolicy the retry policy to use in the new {@code TaskOptions} object. + * @return a new TaskOptions instance with the specified retry policy */ - public TaskOptions(RetryPolicy retryPolicy) { - this(retryPolicy, null); + public static TaskOptions withRetryPolicy(RetryPolicy retryPolicy) { + return new Builder().retryPolicy(retryPolicy).build(); } /** * Creates a new {@code TaskOptions} object from a {@link RetryHandler}. * @param retryHandler the retry handler to use in the new {@code TaskOptions} object. + * @return a new TaskOptions instance with the specified retry handler */ - public TaskOptions(RetryHandler retryHandler) { - this(null, retryHandler); + public static TaskOptions withRetryHandler(RetryHandler retryHandler) { + return new Builder().retryHandler(retryHandler).build(); + } + + /** + * Creates a new {@code TaskOptions} object with the specified app ID. + * @param appID the app ID to use for cross-app workflow routing + * @return a new TaskOptions instance with the specified app ID + */ + public static TaskOptions withAppID(String appID) { + return new Builder().appID(appID).build(); } boolean hasRetryPolicy() { @@ -53,4 +82,67 @@ boolean hasRetryHandler() { public RetryHandler getRetryHandler() { return this.retryHandler; } + + /** + * Gets the configured app ID value or {@code null} if none was configured. + * @return the configured app ID + */ + public String getAppID() { + return this.appID; + } + + boolean hasAppID() { + return this.appID != null && !this.appID.isEmpty(); + } + + /** + * Builder for creating {@code TaskOptions} instances. + */ + public static final class Builder { + private RetryPolicy retryPolicy; + private RetryHandler retryHandler; + private String appID; + + private Builder() { + // Private constructor -enforces using TaskOptions.builder() + } + + /** + * Sets the retry policy for the task options. + * @param retryPolicy the retry policy to use + * @return this builder instance for method chaining + */ + public Builder retryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + /** + * Sets the retry handler for the task options. + * @param retryHandler the retry handler to use + * @return this builder instance for method chaining + */ + public Builder retryHandler(RetryHandler retryHandler) { + this.retryHandler = retryHandler; + return this; + } + + /** + * Sets the app ID for cross-app workflow routing. + * @param appID the app ID to use + * @return this builder instance for method chaining + */ + public Builder appID(String appID) { + this.appID = appID; + return this; + } + + /** + * Builds a new {@code TaskOptions} instance with the configured values. + * @return a new TaskOptions instance + */ + public TaskOptions build() { + return new TaskOptions(this.retryPolicy, this.retryHandler, this.appID); + } + } } diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index 2d4749ee..94bc18a1 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -36,6 +36,13 @@ public interface TaskOrchestrationContext { */ String getInstanceId(); + /** + * Gets the app ID of the current orchestration instance, if available. + * This is used for cross-app workflow routing. + * @return the app ID of the current orchestration instance, or null if not available + */ + String getAppId(); + /** * Gets the current orchestration time in UTC. * @return the current orchestration time in UTC diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 184b4f3b..6405a114 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -30,16 +30,19 @@ final class TaskOrchestrationExecutor { private final DataConverter dataConverter; private final Logger logger; private final Duration maximumTimerInterval; + private final String appId; public TaskOrchestrationExecutor( HashMap orchestrationFactories, DataConverter dataConverter, Duration maximumTimerInterval, - Logger logger) { + Logger logger, + String appId) { this.orchestrationFactories = orchestrationFactories; this.dataConverter = dataConverter; this.maximumTimerInterval = maximumTimerInterval; this.logger = logger; + this.appId = appId; // extracted from router } public TaskOrchestratorResult execute(List pastEvents, List newEvents) { @@ -51,6 +54,7 @@ public TaskOrchestratorResult execute(List pastEvents, List pendingActions = new LinkedHashMap<>(); @@ -137,6 +142,15 @@ private void setInstanceId(String instanceId) { this.instanceId = instanceId; } + @Override + public String getAppId() { + return this.appId; + } + + private void setAppId(String appId) { + this.appId = appId; + } + @Override public Instant getCurrentInstant() { // TODO: Throw if instant is null @@ -270,12 +284,37 @@ public Task callActivity( scheduleTaskBuilder.setInput(StringValue.of(serializedInput)); } + // Add router information for cross-app routing + // Router always has a source app ID from EXECUTIONSTARTED event + TaskRouter.Builder routerBuilder = TaskRouter.newBuilder() + .setSourceAppID(this.appId); + + // Add target app ID if specified in options + if (options != null && options.hasAppID()) { + String targetAppId = options.getAppID(); + routerBuilder.setTargetAppID(targetAppId); + this.logger.fine(() -> String.format( + "cross app routing detected: source=%s, target=%s", + this.appId, targetAppId)); + } + TaskRouter router = routerBuilder.build(); + scheduleTaskBuilder.setRouter(router); TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; - this.pendingActions.put(id, OrchestratorAction.newBuilder() + + ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build(); + OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder() .setId(id) - .setScheduleTask(scheduleTaskBuilder) - .build()); + .setScheduleTask(scheduleTaskBuilder); + TaskRouter.Builder actionRouterBuilder = TaskRouter.newBuilder() + .setSourceAppID(this.appId); + if (options != null && options.hasAppID()) { + String targetAppId = options.getAppID(); + actionRouterBuilder.setTargetAppID(targetAppId); + } + + actionBuilder.setRouter(actionRouterBuilder.build()); + this.pendingActions.put(id, actionBuilder.build()); if (!this.isReplaying) { this.logger.fine(() -> String.format( @@ -337,11 +376,11 @@ public void sendEvent(String instanceId, String eventName, Object eventData) { if (serializedEventData != null){ builder.setData(StringValue.of(serializedEventData)); } + OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder() + .setId(id) + .setSendEvent(builder); - this.pendingActions.put(id, OrchestratorAction.newBuilder() - .setId(id) - .setSendEvent(builder) - .build()); + this.pendingActions.put(id, actionBuilder.build()); if (!this.isReplaying) { this.logger.fine(() -> String.format( @@ -379,6 +418,7 @@ public Task callSubOrchestrator( } createSubOrchestrationActionBuilder.setInstanceId(instanceId); + // TODO: @cicoyle - add suborchestration cross app logic here when its supported TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; this.pendingActions.put(id, OrchestratorAction.newBuilder() @@ -821,33 +861,42 @@ private void processEvent(HistoryEvent e) { if (this.isSuspended && !overrideSuspension) { this.handleEventWhileSuspended(e); } else { + this.logger.fine(() -> this.instanceId + ": Processing event: " + e.getEventTypeCase()); switch (e.getEventTypeCase()) { case ORCHESTRATORSTARTED: Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); this.setCurrentInstant(instant); + this.logger.fine(() -> this.instanceId + ": Workflow orchestrator started"); break; case ORCHESTRATORCOMPLETED: - // No action + // No action needed + this.logger.fine(() -> this.instanceId + ": Workflow orchestrator completed"); break; case EXECUTIONSTARTED: - ExecutionStartedEvent startedEvent = e.getExecutionStarted(); - String name = startedEvent.getName(); - this.setName(name); - String instanceId = startedEvent.getOrchestrationInstance().getInstanceId(); - this.setInstanceId(instanceId); - String input = startedEvent.getInput().getValue(); - this.setInput(input); - TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name); + ExecutionStartedEvent executionStarted = e.getExecutionStarted(); + this.setName(executionStarted.getName()); + this.setInput(executionStarted.getInput().getValue()); + this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId()); + this.logger.fine(() -> this.instanceId + ": Workflow execution started"); + this.setAppId(e.getRouter().getSourceAppID()); + + // Create and invoke the workflow orchestrator + TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(executionStarted.getName()); if (factory == null) { // Try getting the default orchestrator factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*"); } // TODO: Throw if the factory is null (orchestration by that name doesn't exist) + if (factory == null) { + throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName()); + } + TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); break; -// case EXECUTIONCOMPLETED: -// break; + case EXECUTIONCOMPLETED: + this.logger.fine(() -> this.instanceId + ": Workflow execution completed"); + break; // case EXECUTIONFAILED: // break; case EXECUTIONTERMINATED: diff --git a/client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java b/client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java index 4d78bc97..4a178509 100644 --- a/client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java @@ -112,7 +112,7 @@ public void retryActivityFailures(int maxNumberOfAttempts) throws TimeoutExcepti ctx.callActivity( "BustedActivity", null, - new TaskOptions(retryPolicy)).await(); + TaskOptions.withRetryPolicy(retryPolicy)).await(); }); } @@ -125,7 +125,7 @@ public void retryActivityFailuresWithCustomLogic(int maxNumberOfAttempts) throws // Run the test and get back the details of the last failure this.retryOnFailuresCoreTest(maxNumberOfAttempts, maxNumberOfAttempts, ctx -> { RetryHandler retryHandler = getCommonRetryHandler(retryHandlerCalls, maxNumberOfAttempts); - TaskOptions options = new TaskOptions(retryHandler); + TaskOptions options = TaskOptions.withRetryHandler(retryHandler); ctx.callActivity("BustedActivity", null, options).await(); }); @@ -194,7 +194,7 @@ public void retrySubOrchestratorFailures(int maxNumberOfAttempts) throws Timeout "BustedSubOrchestrator", null, null, - new TaskOptions(retryPolicy)).await(); + TaskOptions.withRetryPolicy(retryPolicy)).await(); }); } @@ -207,7 +207,7 @@ public void retrySubOrchestrationFailuresWithCustomLogic(int maxNumberOfAttempts // Run the test and get back the details of the last failure this.retryOnFailuresCoreTest(maxNumberOfAttempts, maxNumberOfAttempts, ctx -> { RetryHandler retryHandler = getCommonRetryHandler(retryHandlerCalls, maxNumberOfAttempts); - TaskOptions options = new TaskOptions(retryHandler); + TaskOptions options = TaskOptions.withRetryHandler(retryHandler); ctx.callSubOrchestrator("BustedSubOrchestrator", null, null, options).await(); }); diff --git a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java index 033d8691..8043bc6e 100644 --- a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java @@ -1370,7 +1370,7 @@ void activityAllOf() throws IOException, TimeoutException { final int activityCount = 10; final AtomicBoolean throwException = new AtomicBoolean(true); final RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - final TaskOptions taskOptions = new TaskOptions(retryPolicy); + final TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { @@ -1428,7 +1428,7 @@ void activityAllOfException() throws IOException, TimeoutException { final String result = "test fail"; final int activityMiddle = 5; final RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - final TaskOptions taskOptions = new TaskOptions(retryPolicy); + final TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { @@ -1491,7 +1491,7 @@ void activityAnyOf() throws IOException, TimeoutException { final int activityCount = 10; final AtomicBoolean throwException = new AtomicBoolean(true); final RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - final TaskOptions taskOptions = new TaskOptions(retryPolicy); + final TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { @@ -1585,7 +1585,7 @@ public void taskExecutionIdTest() { var orchestratorName = "test-task-execution-id"; var retryActivityName = "RetryN"; final RetryPolicy retryPolicy = new RetryPolicy(4, Duration.ofSeconds(3)); - final TaskOptions taskOptions = new TaskOptions(retryPolicy); + final TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); var execMap = new HashMap(); diff --git a/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java b/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java new file mode 100644 index 00000000..47e90f89 --- /dev/null +++ b/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java @@ -0,0 +1,141 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask; + +import org.junit.jupiter.api.Test; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for TaskOptions with cross-app workflow support. + */ +public class TaskOptionsTest { + + @Test + void taskOptionsWithAppID() { + TaskOptions options = TaskOptions.withAppID("app1"); + + assertTrue(options.hasAppID()); + assertEquals("app1", options.getAppID()); + assertFalse(options.hasRetryPolicy()); + assertFalse(options.hasRetryHandler()); + } + + @Test + void taskOptionsWithRetryPolicyAndAppID() { + RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); + TaskOptions options = TaskOptions.builder() + .retryPolicy(retryPolicy) + .appID("app2") + .build(); + + assertTrue(options.hasAppID()); + assertEquals("app2", options.getAppID()); + assertTrue(options.hasRetryPolicy()); + assertEquals(retryPolicy, options.getRetryPolicy()); + assertFalse(options.hasRetryHandler()); + } + + @Test + void taskOptionsWithRetryHandlerAndAppID() { + RetryHandler retryHandler = new RetryHandler() { + @Override + public boolean handle(RetryContext context) { + return context.getLastAttemptNumber() < 2; + } + }; + TaskOptions options = TaskOptions.builder() + .retryHandler(retryHandler) + .appID("app3") + .build(); + + assertTrue(options.hasAppID()); + assertEquals("app3", options.getAppID()); + assertFalse(options.hasRetryPolicy()); + assertTrue(options.hasRetryHandler()); + assertEquals(retryHandler, options.getRetryHandler()); + } + + @Test + void taskOptionsWithoutAppID() { + TaskOptions options = TaskOptions.create(); + + assertFalse(options.hasAppID()); + assertNull(options.getAppID()); + } + + @Test + void taskOptionsWithEmptyAppID() { + TaskOptions options = TaskOptions.withAppID(""); + + assertFalse(options.hasAppID()); + assertEquals("", options.getAppID()); + } + + @Test + void taskOptionsWithNullAppID() { + TaskOptions options = TaskOptions.builder().appID(null).build(); + + assertFalse(options.hasAppID()); + assertNull(options.getAppID()); + } + + @Test + void taskOptionsWithRetryPolicy() { + RetryPolicy retryPolicy = new RetryPolicy(5, Duration.ofMinutes(1)); + TaskOptions options = TaskOptions.withRetryPolicy(retryPolicy); + + assertTrue(options.hasRetryPolicy()); + assertEquals(retryPolicy, options.getRetryPolicy()); + assertFalse(options.hasRetryHandler()); + assertFalse(options.hasAppID()); + } + + @Test + void taskOptionsWithRetryHandler() { + RetryHandler retryHandler = new RetryHandler() { + @Override + public boolean handle(RetryContext context) { + return context.getLastAttemptNumber() < 3; + } + }; + TaskOptions options = TaskOptions.withRetryHandler(retryHandler); + + assertTrue(options.hasRetryHandler()); + assertEquals(retryHandler, options.getRetryHandler()); + assertFalse(options.hasRetryPolicy()); + assertFalse(options.hasAppID()); + } + + @Test + void taskOptionsWithBuilderChaining() { + RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); + RetryHandler retryHandler = context -> true; + + TaskOptions options = TaskOptions.builder() + .retryPolicy(retryPolicy) + .retryHandler(retryHandler) + .appID("test-app") + .build(); + + assertNotNull(options); + assertTrue(options.hasRetryPolicy()); + assertEquals(retryPolicy, options.getRetryPolicy()); + assertTrue(options.hasRetryHandler()); + assertEquals(retryHandler, options.getRetryHandler()); + assertTrue(options.hasAppID()); + assertEquals("test-app", options.getAppID()); + } +} \ No newline at end of file diff --git a/endtoendtests/src/main/java/com/functions/ParallelFunctions.java b/endtoendtests/src/main/java/com/functions/ParallelFunctions.java index 2a744e73..852b3be9 100644 --- a/endtoendtests/src/main/java/com/functions/ParallelFunctions.java +++ b/endtoendtests/src/main/java/com/functions/ParallelFunctions.java @@ -33,7 +33,7 @@ public HttpResponseMessage startParallelOrchestration( public List parallelOrchestratorSad( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); List> tasks = new ArrayList<>(); tasks.add(ctx.callActivity("AppendSad", "Input1", taskOptions, String.class)); tasks.add(ctx.callActivity("AppendSad", "Input2", taskOptions, String.class)); @@ -82,7 +82,7 @@ public HttpResponseMessage startParallelAnyOf( public Object parallelAnyOf( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); List> tasks = new ArrayList<>(); tasks.add(ctx.callActivity("AppendHappy", "AnyOf1", taskOptions, String.class)); tasks.add(ctx.callActivity("AppendHappy", "AnyOf2", String.class)); @@ -110,7 +110,7 @@ public List parallelCatchException( try { List> tasks = new ArrayList<>(); RetryPolicy policy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions options = new TaskOptions(policy); + TaskOptions options = TaskOptions.withRetryPolicy(policy); tasks.add(ctx.callActivity("AlwaysException", "Input1", options, String.class)); tasks.add(ctx.callActivity("AppendHappy", "Input2", options, String.class)); return ctx.allOf(tasks).await(); diff --git a/endtoendtests/src/main/java/com/functions/RetriableTask.java b/endtoendtests/src/main/java/com/functions/RetriableTask.java index 95377f3b..9b267ef2 100644 --- a/endtoendtests/src/main/java/com/functions/RetriableTask.java +++ b/endtoendtests/src/main/java/com/functions/RetriableTask.java @@ -42,7 +42,7 @@ public HttpResponseMessage retriableOrchestration( public String retriableTask( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("Append", "Test-Input", taskOptions, String.class).await(); } @@ -88,7 +88,7 @@ public HttpResponseMessage retriableOrchestrationSuccess( public String retriableTaskFail( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("AppendFail", "Test-Input", taskOptions, String.class).await(); } @@ -96,7 +96,7 @@ public String retriableTaskFail( public String retriableTaskSuccess( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("AppendSuccess", "Test-Input", taskOptions, String.class).await(); } diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index c06f9bc7..d8ec2af9 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -545663a4db0040bb02c642c27010e337b9be8d31 \ No newline at end of file +4b86756497d875b97f9a91051781b5711c1e4fa6 \ No newline at end of file diff --git a/internal/durabletask-protobuf/README.md b/internal/durabletask-protobuf/README.md index 6b310469..7193317d 100644 --- a/internal/durabletask-protobuf/README.md +++ b/internal/durabletask-protobuf/README.md @@ -1,6 +1,6 @@ # Durable Task Protobuf Files -This directory contains the protocol buffer definitions used by the Durable Task Framework Java SDK. The files in this directory are automatically downloaded and updated during the build process from the [microsoft/durabletask-protobuf](https://github.com/microsoft/durabletask-protobuf) repository. +This directory contains the protocol buffer definitions used by the Durable Task Framework Java SDK. The files in this directory are automatically downloaded and updated during the build process from the [dapr/durabletask-protobuf](https://github.com/dapr/durabletask-protobuf) repository. ## Directory Structure @@ -21,4 +21,6 @@ If you need to manually update the proto files, you can run: ```bash ./gradlew downloadProtoFiles -PprotoBranch= -``` \ No newline at end of file +``` + +See [here for more instructions.](../../CONTRIBUTING.md#updating-protobuf-definitions) \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index c79aab16..41fad9de 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -13,8 +13,8 @@ import "google/protobuf/wrappers.proto"; import "google/protobuf/empty.proto"; message TaskRouter { - string source = 1; // orchestrationAppID - optional string target = 2; // appID + string sourceAppID = 1; + optional string targetAppID = 2; } message OrchestrationInstance { @@ -64,6 +64,7 @@ message ParentInstanceInfo { google.protobuf.StringValue name = 2; google.protobuf.StringValue version = 3; OrchestrationInstance orchestrationInstance = 4; + optional string appID = 5; } message TraceContext { diff --git a/samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java b/samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java index f7797705..74f4e399 100644 --- a/samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java +++ b/samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java @@ -33,7 +33,7 @@ public HttpResponseMessage startParallelOrchestration( public List parallelOrchestratorSad( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); List> tasks = new ArrayList<>(); tasks.add(ctx.callActivity("AppendSad", "Input1", taskOptions, String.class)); tasks.add(ctx.callActivity("AppendSad", "Input2", taskOptions, String.class)); @@ -82,7 +82,7 @@ public HttpResponseMessage startParallelAnyOf( public Object parallelAnyOf( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); List> tasks = new ArrayList<>(); tasks.add(ctx.callActivity("AppendHappy", "AnyOf1", taskOptions, String.class)); tasks.add(ctx.callActivity("AppendHappy", "AnyOf2", String.class)); @@ -110,7 +110,7 @@ public List parallelCatchException( try { List> tasks = new ArrayList<>(); RetryPolicy policy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions options = new TaskOptions(policy); + TaskOptions options = TaskOptions.withRetryPolicy(policy); tasks.add(ctx.callActivity("AlwaysException", "Input1", options, String.class)); tasks.add(ctx.callActivity("AppendHappy", "Input2", options, String.class)); return ctx.allOf(tasks).await(); diff --git a/samples-azure-functions/src/main/java/com/functions/RetriableTask.java b/samples-azure-functions/src/main/java/com/functions/RetriableTask.java index 95377f3b..9b267ef2 100644 --- a/samples-azure-functions/src/main/java/com/functions/RetriableTask.java +++ b/samples-azure-functions/src/main/java/com/functions/RetriableTask.java @@ -42,7 +42,7 @@ public HttpResponseMessage retriableOrchestration( public String retriableTask( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("Append", "Test-Input", taskOptions, String.class).await(); } @@ -88,7 +88,7 @@ public HttpResponseMessage retriableOrchestrationSuccess( public String retriableTaskFail( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("AppendFail", "Test-Input", taskOptions, String.class).await(); } @@ -96,7 +96,7 @@ public String retriableTaskFail( public String retriableTaskSuccess( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("AppendSuccess", "Test-Input", taskOptions, String.class).await(); }