From cfb9da33039c4052a157c71c61ef8139ab3f834f Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Tue, 24 Jun 2025 17:02:18 -0500 Subject: [PATCH 01/11] update protos Signed-off-by: Cassandra Coyle --- CONTRIBUTING.md | 12 ++++++++++ .../PROTO_SOURCE_COMMIT_HASH | 2 +- internal/durabletask-protobuf/README.md | 2 +- .../protos/orchestrator_service.proto | 24 +++++++++++++------ 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fd898fa7..cb197613 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,4 +2,16 @@ Build & test: ```shell ./gradlew build +``` + +## Updating Protobuf Definitions + +When updating the protobuf definitions in `internal/durabletask-protobuf/protos/orchestrator_service.proto`: + +1. Manually copy the updated protobuf file from dapr/durabletask-protobuf +2. Update the commit hash in `internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH` to reflect the new commit +3. Regenerate the Java classes from the protobuf definitions: + +```shell +./gradlew generateProto ``` \ No newline at end of file diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index a84ead2a..3e564a57 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -cc00765eeb3307f8fdac7da610915d3a0757702b \ No newline at end of file +ce184193bcf2d4dcf643bcc934d3ea897c4c8c5c \ No newline at end of file diff --git a/internal/durabletask-protobuf/README.md b/internal/durabletask-protobuf/README.md index 6b310469..4c3f2986 100644 --- a/internal/durabletask-protobuf/README.md +++ b/internal/durabletask-protobuf/README.md @@ -1,6 +1,6 @@ # Durable Task Protobuf Files -This directory contains the protocol buffer definitions used by the Durable Task Framework Java SDK. The files in this directory are automatically downloaded and updated during the build process from the [microsoft/durabletask-protobuf](https://github.com/microsoft/durabletask-protobuf) repository. +This directory contains the protocol buffer definitions used by the Durable Task Framework Java SDK. The files in this directory are automatically downloaded and updated during the build process from the [dapr/durabletask-protobuf](https://github.com/dapr/durabletask-protobuf) repository. ## Directory Structure diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index 90d7f918..17c40077 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -3,7 +3,7 @@ syntax = "proto3"; -option csharp_namespace = "Microsoft.DurableTask.Protobuf"; +option csharp_namespace = "Dapr.DurableTask.Protobuf"; option java_package = "io.dapr.durabletask.implementation.protobuf"; option go_package = "/api/protos"; @@ -12,6 +12,11 @@ import "google/protobuf/duration.proto"; import "google/protobuf/wrappers.proto"; import "google/protobuf/empty.proto"; +message TaskRouter { + string source = 1; // orchestrationAppID + string target = 2; // appID +} + message OrchestrationInstance { string instanceId = 1; google.protobuf.StringValue executionId = 2; @@ -192,10 +197,10 @@ message EntityOperationCalledEvent { } message EntityLockRequestedEvent { - string criticalSectionId = 1; + string criticalSectionId = 1; repeated string lockSet = 2; int32 position = 3; - google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories + google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories } message EntityOperationCompletedEvent { @@ -210,14 +215,14 @@ message EntityOperationFailedEvent { message EntityUnlockSentEvent { string criticalSectionId = 1; - google.protobuf.StringValue parentInstanceId = 2; // used only within messages, null in histories + google.protobuf.StringValue parentInstanceId = 2; // used only within messages, null in histories google.protobuf.StringValue targetInstanceId = 3; // used only within histories, null in messages } message EntityLockGrantedEvent { string criticalSectionId = 1; } - + message HistoryEvent { int32 eventId = 1; google.protobuf.Timestamp timestamp = 2; @@ -244,18 +249,20 @@ message HistoryEvent { ExecutionResumedEvent executionResumed = 22; EntityOperationSignaledEvent entityOperationSignaled = 23; EntityOperationCalledEvent entityOperationCalled = 24; - EntityOperationCompletedEvent entityOperationCompleted = 25; - EntityOperationFailedEvent entityOperationFailed = 26; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; EntityLockRequestedEvent entityLockRequested = 27; EntityLockGrantedEvent entityLockGranted = 28; EntityUnlockSentEvent entityUnlockSent = 29; } + optional TaskRouter router = 30; } message ScheduleTaskAction { string name = 1; google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; + optional TaskRouter router = 4; } message CreateSubOrchestrationAction { @@ -263,6 +270,7 @@ message CreateSubOrchestrationAction { string name = 2; google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; + optional TaskRouter router = 5; } message CreateTimerAction { @@ -311,6 +319,7 @@ message OrchestratorAction { TerminateOrchestrationAction terminateOrchestration = 7; SendEntityMessageAction sendEntityMessage = 8; } + optional TaskRouter router = 9; } message OrchestratorRequest { @@ -320,6 +329,7 @@ message OrchestratorRequest { repeated HistoryEvent newEvents = 4; OrchestratorEntityParameters entityParameters = 5; bool requiresHistoryStreaming = 6; + optional TaskRouter router = 7; } message OrchestratorResponse { From f46600ccfa91436591e967066b1aaa4ad011b03d Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Tue, 24 Jun 2025 17:02:41 -0500 Subject: [PATCH 02/11] update to use dapr release Signed-off-by: Cassandra Coyle --- README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c816022f..6c33c3e5 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Durable Task Client SDK for Java -[![Build](https://github.com/microsoft/durabletask-java/actions/workflows/build-validation.yml/badge.svg)](https://github.com/microsoft/durabletask-java/actions/workflows/build-validation.yml) +[![Build](https://github.com/dapr/durabletask-java/actions/workflows/build-validation.yml/badge.svg)](https://github.com/dapr/durabletask-java/actions/workflows/build-validation.yml) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) This repo contains the Java SDK for the Durable Task Framework as well as classes and annotations to support running [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview?tabs=java) for Java. With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Java code. @@ -78,8 +78,7 @@ The following packages are produced from this repo. | Package | Latest version | | - | - | -| Durable Task - Client | [![Maven Central](https://img.shields.io/maven-central/v/com.microsoft/durabletask-client?label=durabletask-client)](https://mvnrepository.com/artifact/com.microsoft/durabletask-client/1.0.0) | -| Durable Task - Azure Functions | [![Maven Central](https://img.shields.io/maven-central/v/com.microsoft/durabletask-azure-functions?label=durabletask-azure-functions)](https://mvnrepository.com/artifact/com.microsoft/durabletask-azure-functions/1.0.1) | +| Durable Task - Client | [![Maven Central](https://img.shields.io/maven-central/v/io.dapr/durabletask-client?label=durabletask-client)](https://mvnrepository.com/artifact/io.dapr/durabletask-client/1.5.7) | ## Getting started with Azure Functions From ee93882baae7e343d250377279766987fd485f73 Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Wed, 25 Jun 2025 16:55:56 -0500 Subject: [PATCH 03/11] add appID support for cross app wf and unit tests for task options Signed-off-by: Cassandra Coyle --- CONTRIBUTING.md | 5 ++ .../java/io/dapr/durabletask/TaskOptions.java | 41 ++++++++- .../io/dapr/durabletask/TaskOptionsTest.java | 88 +++++++++++++++++++ 3 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index cb197613..617e65d3 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,3 +1,8 @@ +Verify Compilation: +```shell +./gradlew compileJava +``` + Build & test: ```shell diff --git a/client/src/main/java/io/dapr/durabletask/TaskOptions.java b/client/src/main/java/io/dapr/durabletask/TaskOptions.java index 50f47273..fcb34c33 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOptions.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOptions.java @@ -8,10 +8,18 @@ public final class TaskOptions { private final RetryPolicy retryPolicy; private final RetryHandler retryHandler; + private final String appID; + + public TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler, String appID) { + this.retryPolicy = retryPolicy; + this.retryHandler = retryHandler; + this.appID = appID; + } public TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler) { this.retryPolicy = retryPolicy; this.retryHandler = retryHandler; + this.appID = null; } /** @@ -19,7 +27,7 @@ public TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler) { * @param retryPolicy the retry policy to use in the new {@code TaskOptions} object. */ public TaskOptions(RetryPolicy retryPolicy) { - this(retryPolicy, null); + this(retryPolicy, null, null); } /** @@ -27,7 +35,24 @@ public TaskOptions(RetryPolicy retryPolicy) { * @param retryHandler the retry handler to use in the new {@code TaskOptions} object. */ public TaskOptions(RetryHandler retryHandler) { - this(null, retryHandler); + this(null, retryHandler, null); + } + + /** + * Creates a new {@code TaskOptions} object with the specified app ID. + * @param appID the app ID to use for cross-app workflow routing + */ + public TaskOptions(String appID) { + this(null, null, appID); + } + + /** + * Creates a new {@code TaskOptions} object with the specified retry policy and app ID. + * @param retryPolicy the retry policy to use + * @param appID the app ID to use for cross-app workflow routing + */ + public TaskOptions(RetryPolicy retryPolicy, String appID) { + this(retryPolicy, null, appID); } boolean hasRetryPolicy() { @@ -53,4 +78,16 @@ boolean hasRetryHandler() { public RetryHandler getRetryHandler() { return this.retryHandler; } + + /** + * Gets the configured app ID value or {@code null} if none was configured. + * @return the configured app ID + */ + public String getAppID() { + return this.appID; + } + + boolean hasAppID() { + return this.appID != null && !this.appID.isEmpty(); + } } diff --git a/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java b/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java new file mode 100644 index 00000000..007b258d --- /dev/null +++ b/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask; + +import org.junit.jupiter.api.Test; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for TaskOptions with cross-app workflow support. + */ +public class TaskOptionsTest { + + @Test + void taskOptionsWithAppID() { + TaskOptions options = new TaskOptions("app1"); + + assertTrue(options.hasAppID()); + assertEquals("app1", options.getAppID()); + assertFalse(options.hasRetryPolicy()); + assertFalse(options.hasRetryHandler()); + } + + @Test + void taskOptionsWithRetryPolicyAndAppID() { + RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); + TaskOptions options = new TaskOptions(retryPolicy, null, "app2"); + + assertTrue(options.hasAppID()); + assertEquals("app2", options.getAppID()); + assertTrue(options.hasRetryPolicy()); + assertEquals(retryPolicy, options.getRetryPolicy()); + assertFalse(options.hasRetryHandler()); + } + + @Test + void taskOptionsWithRetryHandlerAndAppID() { + RetryHandler retryHandler = new RetryHandler() { + @Override + public boolean handle(RetryContext context) { + return context.getLastAttemptNumber() < 2; + } + }; + TaskOptions options = new TaskOptions(null, retryHandler, "app3"); + + assertTrue(options.hasAppID()); + assertEquals("app3", options.getAppID()); + assertFalse(options.hasRetryPolicy()); + assertTrue(options.hasRetryHandler()); + assertEquals(retryHandler, options.getRetryHandler()); + } + + @Test + void taskOptionsWithoutAppID() { + TaskOptions options = new TaskOptions(null, null, null); + + assertFalse(options.hasAppID()); + assertNull(options.getAppID()); + } + + @Test + void taskOptionsWithEmptyAppID() { + TaskOptions options = new TaskOptions(""); + + assertFalse(options.hasAppID()); + assertEquals("", options.getAppID()); + } + + @Test + void taskOptionsWithNullAppID() { + TaskOptions options = new TaskOptions(null, null, null); + + assertFalse(options.hasAppID()); + assertNull(options.getAppID()); + } +} \ No newline at end of file From f47fa4f0c9da46bb7ec69624db5b915d64dae75c Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Thu, 3 Jul 2025 15:20:14 -0500 Subject: [PATCH 04/11] wip cross app call activity for wf Signed-off-by: Cassandra Coyle --- CONTRIBUTING.md | 17 ++++++++++- client/build.gradle | 5 ++++ .../durabletask/DurableTaskGrpcWorker.java | 19 ++++++++++-- .../NewOrchestrationInstanceOptions.java | 30 +++++++++++++++++++ .../durabletask/TaskOrchestrationContext.java | 7 +++++ .../PROTO_SOURCE_COMMIT_HASH | 2 +- .../protos/orchestrator_service.proto | 7 ++++- 7 files changed, 82 insertions(+), 5 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 617e65d3..df9c5ec3 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -19,4 +19,19 @@ When updating the protobuf definitions in `internal/durabletask-protobuf/protos/ ```shell ./gradlew generateProto -``` \ No newline at end of file +``` + +## Test locally from dapr/java-sdk + +```shell +./gradlew publishToMavenLocal +``` + +or simply `./gradlew build publishToMavenLocal` + +Check if it was released locally with: +```shell +ls ~/.m2/repository/io/dapr/durabletask-client/ +``` + +Then update the durabletask-client in the java-sdk pom.xml file to the newest version you released locally. \ No newline at end of file diff --git a/client/build.gradle b/client/build.gradle index b51ce513..1f2fdb35 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -26,6 +26,7 @@ dependencies { // https://github.com/grpc/grpc-java#download implementation "io.grpc:grpc-protobuf:${grpcVersion}" implementation "io.grpc:grpc-stub:${grpcVersion}" + implementation 'com.google.protobuf:protobuf-java:3.25.5' runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}" compileOnly "org.apache.tomcat:annotations-api:6.0.53" @@ -194,6 +195,10 @@ java { withJavadocJar() } +tasks.named('sourcesJar').configure { + dependsOn tasks.named('generateProto') +} + spotbugs { toolVersion = '4.9.2' effort = 'max' diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 32ec000a..127a4392 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -36,6 +36,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final DataConverter dataConverter; private final Duration maximumTimerInterval; private final ExecutorService workerPool; + private final String appId; // App ID for cross-app routing private final TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; @@ -45,6 +46,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { this.orchestrationFactories.putAll(builder.orchestrationFactories); this.activityFactories.putAll(builder.activityFactories); + this.appId = builder.appId; Channel sidecarGrpcChannel; if (builder.channel != null) { @@ -128,7 +130,8 @@ public void startAndBlock() { this.orchestrationFactories, this.dataConverter, this.maximumTimerInterval, - logger); + logger, + this.appId); TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor( this.activityFactories, this.dataConverter, @@ -143,6 +146,9 @@ public void startAndBlock() { RequestCase requestType = workItem.getRequestCase(); if (requestType == RequestCase.ORCHESTRATORREQUEST) { OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); + logger.log(Level.FINEST, + String.format("Processing orchestrator request for instance: {0}", + orchestratorRequest.getInstanceId())); // TODO: Error handling this.workerPool.submit(() -> { @@ -159,6 +165,9 @@ public void startAndBlock() { try { this.sidecarClient.completeOrchestratorTask(response); + logger.log(Level.FINEST, + "Completed orchestrator request for instance: {0}", + orchestratorRequest.getInstanceId()); } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { logger.log(Level.WARNING, @@ -177,7 +186,12 @@ public void startAndBlock() { }); } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); + logger.log(Level.FINEST, + String.format("Processing activity request: %s for instance: %s}", + activityRequest.getName(), + activityRequest.getOrchestrationInstance().getInstanceId())); + // TODO: Error handling this.workerPool.submit(() -> { String output = null; TaskFailureDetails failureDetails = null; @@ -227,7 +241,8 @@ public void startAndBlock() { } else if (requestType == RequestCase.HEALTHPING) { // No-op } else { - logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", + logger.log(Level.WARNING, + "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType); } } diff --git a/client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java b/client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java index 53c97396..bb861af9 100644 --- a/client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java +++ b/client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java @@ -12,6 +12,7 @@ public final class NewOrchestrationInstanceOptions { private String instanceId; private Object input; private Instant startTime; + private String appID; // Target app ID for cross-app workflow routing /** * Default constructor for the {@link NewOrchestrationInstanceOptions} class. @@ -71,6 +72,17 @@ public NewOrchestrationInstanceOptions setStartTime(Instant startTime) { return this; } + /** + * Sets the target app ID for cross-app workflow routing. + * + * @param appID the target app ID for cross-app routing + * @return this {@link NewOrchestrationInstanceOptions} object + */ + public NewOrchestrationInstanceOptions setAppID(String appID) { + this.appID = appID; + return this; + } + /** * Gets the user-specified version of the new orchestration. * @@ -106,4 +118,22 @@ public Object getInput() { public Instant getStartTime() { return this.startTime; } + + /** + * Gets the configured target app ID for cross-app workflow routing. + * + * @return the configured target app ID + */ + public String getAppID() { + return this.appID; + } + + /** + * Checks if an app ID is configured for cross-app routing. + * + * @return true if an app ID is configured, false otherwise + */ + public boolean hasAppID() { + return this.appID != null && !this.appID.isEmpty(); + } } diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index 2d4749ee..94bc18a1 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -36,6 +36,13 @@ public interface TaskOrchestrationContext { */ String getInstanceId(); + /** + * Gets the app ID of the current orchestration instance, if available. + * This is used for cross-app workflow routing. + * @return the app ID of the current orchestration instance, or null if not available + */ + String getAppId(); + /** * Gets the current orchestration time in UTC. * @return the current orchestration time in UTC diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index 3e564a57..c06f9bc7 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -ce184193bcf2d4dcf643bcc934d3ea897c4c8c5c \ No newline at end of file +545663a4db0040bb02c642c27010e337b9be8d31 \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index 17c40077..c79aab16 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -14,7 +14,7 @@ import "google/protobuf/empty.proto"; message TaskRouter { string source = 1; // orchestrationAppID - string target = 2; // appID + optional string target = 2; // appID } message OrchestrationInstance { @@ -29,6 +29,7 @@ message ActivityRequest { OrchestrationInstance orchestrationInstance = 4; int32 taskId = 5; TraceContext parentTraceContext = 6; + string taskExecutionId = 7; } message ActivityResponse { @@ -99,16 +100,19 @@ message TaskScheduledEvent { google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; TraceContext parentTraceContext = 4; + string taskExecutionId = 5; } message TaskCompletedEvent { int32 taskScheduledId = 1; google.protobuf.StringValue result = 2; + string taskExecutionId = 3; } message TaskFailedEvent { int32 taskScheduledId = 1; TaskFailureDetails failureDetails = 2; + string taskExecutionId = 3; } message SubOrchestrationInstanceCreatedEvent { @@ -263,6 +267,7 @@ message ScheduleTaskAction { google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; optional TaskRouter router = 4; + string taskExecutionId = 5; } message CreateSubOrchestrationAction { From 3302911dc5b1e748dbd663e1264336ab210c3200 Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Thu, 3 Jul 2025 18:25:38 -0500 Subject: [PATCH 05/11] cross app activity calls works Signed-off-by: Cassandra Coyle --- .../DurableTaskGrpcWorkerBuilder.java | 292 +++++++++--------- .../dapr/durabletask/OrchestrationRunner.java | 3 +- .../TaskOrchestrationExecutor.java | 83 +++-- 3 files changed, 220 insertions(+), 158 deletions(-) diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java index 8ef0df8d..cb13296c 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -1,138 +1,154 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package io.dapr.durabletask; - -import io.grpc.Channel; - -import java.time.Duration; -import java.util.HashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. - */ -public final class DurableTaskGrpcWorkerBuilder { - final HashMap orchestrationFactories = new HashMap<>(); - final HashMap activityFactories = new HashMap<>(); - int port; - Channel channel; - DataConverter dataConverter; - Duration maximumTimerInterval; - ExecutorService executorService; - - /** - * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. - * - * @param factory an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker} - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) { - String key = factory.getName(); - if (key == null || key.length() == 0) { - throw new IllegalArgumentException("A non-empty task orchestration name is required."); - } - - if (this.orchestrationFactories.containsKey(key)) { - throw new IllegalArgumentException( - String.format("A task orchestration factory named %s is already registered.", key)); - } - - this.orchestrationFactories.put(key, factory); - return this; - } - - /** - * Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}. - * - * @param factory an activity factory to be used by the constructed {@link DurableTaskGrpcWorker} - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) { - // TODO: Input validation - String key = factory.getName(); - if (key == null || key.length() == 0) { - throw new IllegalArgumentException("A non-empty task activity name is required."); - } - - if (this.activityFactories.containsKey(key)) { - throw new IllegalArgumentException( - String.format("A task activity factory named %s is already registered.", key)); - } - - this.activityFactories.put(key, factory); - return this; - } - - /** - * Sets the gRPC channel to use for communicating with the sidecar process. - *

- * This builder method allows you to provide your own gRPC channel for communicating with the Durable Task sidecar - * endpoint. Channels provided using this method won't be closed when the worker is closed. - * Rather, the caller remains responsible for shutting down the channel after disposing the worker. - *

- * If not specified, a gRPC channel will be created automatically for each constructed - * {@link DurableTaskGrpcWorker}. - * - * @param channel the gRPC channel to use - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder grpcChannel(Channel channel) { - this.channel = channel; - return this; - } - - /** - * Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used. - * - * @param port the gRPC endpoint port to connect to - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder port(int port) { - this.port = port; - return this; - } - - /** - * Sets the {@link DataConverter} to use for converting serializable data payloads. - * - * @param dataConverter the {@link DataConverter} to use for converting serializable data payloads - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) { - this.dataConverter = dataConverter; - return this; - } - - /** - * Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used. - * The default maximum timer interval duration is 3 days. - * - * @param maximumTimerInterval the maximum timer interval - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) { - this.maximumTimerInterval = maximumTimerInterval; - return this; - } - - /** - * Sets the executor service that will be used to execute threads. - * - * @param executorService {@link ExecutorService}. - * @return this builder object. - */ - public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) { - this.executorService = executorService; - return this; - } - - /** - * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. - * @return a new {@link DurableTaskGrpcWorker} object - */ - public DurableTaskGrpcWorker build() { - return new DurableTaskGrpcWorker(this); - } -} +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package io.dapr.durabletask; + +import io.grpc.Channel; + +import java.time.Duration; +import java.util.HashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. + */ +public final class DurableTaskGrpcWorkerBuilder { + final HashMap orchestrationFactories = new HashMap<>(); + final HashMap activityFactories = new HashMap<>(); + int port; + Channel channel; + DataConverter dataConverter; + Duration maximumTimerInterval; + ExecutorService executorService; + String appId; // App ID for cross-app routing + + /** + * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. + * + * @param factory an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker} + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) { + String key = factory.getName(); + if (key == null || key.length() == 0) { + throw new IllegalArgumentException("A non-empty task orchestration name is required."); + } + + if (this.orchestrationFactories.containsKey(key)) { + throw new IllegalArgumentException( + String.format("A task orchestration factory named %s is already registered.", key)); + } + + this.orchestrationFactories.put(key, factory); + return this; + } + + /** + * Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}. + * + * @param factory an activity factory to be used by the constructed {@link DurableTaskGrpcWorker} + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) { + // TODO: Input validation + String key = factory.getName(); + if (key == null || key.length() == 0) { + throw new IllegalArgumentException("A non-empty task activity name is required."); + } + + if (this.activityFactories.containsKey(key)) { + throw new IllegalArgumentException( + String.format("A task activity factory named %s is already registered.", key)); + } + + this.activityFactories.put(key, factory); + return this; + } + + /** + * Sets the gRPC channel to use for communicating with the sidecar process. + *

+ * This builder method allows you to provide your own gRPC channel for communicating with the Durable Task sidecar + * endpoint. Channels provided using this method won't be closed when the worker is closed. + * Rather, the caller remains responsible for shutting down the channel after disposing the worker. + *

+ * If not specified, a gRPC channel will be created automatically for each constructed + * {@link DurableTaskGrpcWorker}. + * + * @param channel the gRPC channel to use + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder grpcChannel(Channel channel) { + this.channel = channel; + return this; + } + + /** + * Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used. + * + * @param port the gRPC endpoint port to connect to + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder port(int port) { + this.port = port; + return this; + } + + /** + * Sets the {@link DataConverter} to use for converting serializable data payloads. + * + * @param dataConverter the {@link DataConverter} to use for converting serializable data payloads + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) { + this.dataConverter = dataConverter; + return this; + } + + /** + * Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used. + * The default maximum timer interval duration is 3 days. + * + * @param maximumTimerInterval the maximum timer interval + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) { + this.maximumTimerInterval = maximumTimerInterval; + return this; + } + + /** + * Sets the executor service that will be used to execute threads. + * + * @param executorService {@link ExecutorService}. + * @return this builder object. + */ + public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + /** + * Sets the app ID for cross-app workflow routing. + *

+ * This app ID is used to identify this worker in cross-app routing scenarios. + * It should match the app ID configured in the Dapr sidecar. + *

+ * + * @param appId the app ID for this worker + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder appId(String appId) { + this.appId = appId; + return this; + } + + /** + * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. + * @return a new {@link DurableTaskGrpcWorker} object + */ + public DurableTaskGrpcWorker build() { + return new DurableTaskGrpcWorker(this); + } +} diff --git a/client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java b/client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java index de6530f9..4f327c2a 100644 --- a/client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java +++ b/client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java @@ -129,7 +129,8 @@ public TaskOrchestration create() { orchestrationFactories, new JacksonDataConverter(), DEFAULT_MAXIMUM_TIMER_INTERVAL, - logger); + logger, + null); // No app ID for static runner // TODO: Error handling TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index b99f12ec..9d15765b 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -30,16 +30,19 @@ final class TaskOrchestrationExecutor { private final DataConverter dataConverter; private final Logger logger; private final Duration maximumTimerInterval; + private final String appId; public TaskOrchestrationExecutor( HashMap orchestrationFactories, DataConverter dataConverter, Duration maximumTimerInterval, - Logger logger) { + Logger logger, + String appId) { this.orchestrationFactories = orchestrationFactories; this.dataConverter = dataConverter; this.maximumTimerInterval = maximumTimerInterval; this.logger = logger; + this.appId = appId; // extracted from router } public TaskOrchestratorResult execute(List pastEvents, List newEvents) { @@ -51,6 +54,7 @@ public TaskOrchestratorResult execute(List pastEvents, List pendingActions = new LinkedHashMap<>(); @@ -137,6 +142,11 @@ private void setInstanceId(String instanceId) { this.instanceId = instanceId; } + @Override + public String getAppId() { + return this.appId; + } + @Override public Instant getCurrentInstant() { // TODO: Throw if instant is null @@ -270,12 +280,37 @@ public Task callActivity( scheduleTaskBuilder.setInput(StringValue.of(serializedInput)); } + // Add router information for cross-app routing + // Router always has a source app ID from EXECUTIONSTARTED event + TaskRouter.Builder routerBuilder = TaskRouter.newBuilder() + .setSource(this.appId); + + // Add target app ID if specified in options + if (options != null && options.hasAppID()) { + String targetAppId = options.getAppID(); + routerBuilder.setTarget(targetAppId); + this.logger.fine(() -> String.format( + "cross app routing detected: source=%s, target=%s", + this.appId, targetAppId)); + } + TaskRouter router = routerBuilder.build(); + scheduleTaskBuilder.setRouter(router); TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; - this.pendingActions.put(id, OrchestratorAction.newBuilder() + + ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build(); + OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder() .setId(id) - .setScheduleTask(scheduleTaskBuilder) - .build()); + .setScheduleTask(scheduleTaskBuilder); + TaskRouter.Builder actionRouterBuilder = TaskRouter.newBuilder() + .setSource(this.appId); + if (options != null && options.hasAppID()) { + String targetAppId = options.getAppID(); + actionRouterBuilder.setTarget(targetAppId); + } + + actionBuilder.setRouter(actionRouterBuilder.build()); + this.pendingActions.put(id, actionBuilder.build()); if (!this.isReplaying) { this.logger.fine(() -> String.format( @@ -337,11 +372,11 @@ public void sendEvent(String instanceId, String eventName, Object eventData) { if (serializedEventData != null){ builder.setData(StringValue.of(serializedEventData)); } + OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder() + .setId(id) + .setSendEvent(builder); - this.pendingActions.put(id, OrchestratorAction.newBuilder() - .setId(id) - .setSendEvent(builder) - .build()); + this.pendingActions.put(id, actionBuilder.build()); if (!this.isReplaying) { this.logger.fine(() -> String.format( @@ -379,6 +414,7 @@ public Task callSubOrchestrator( } createSubOrchestrationActionBuilder.setInstanceId(instanceId); + // TODO: @cicoyle - add suborchestration cross app logic here when its supported TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; this.pendingActions.put(id, OrchestratorAction.newBuilder() @@ -821,33 +857,42 @@ private void processEvent(HistoryEvent e) { if (this.isSuspended && !overrideSuspension) { this.handleEventWhileSuspended(e); } else { + this.logger.fine(() -> this.instanceId + ": Processing event: " + e.getEventTypeCase()); switch (e.getEventTypeCase()) { case ORCHESTRATORSTARTED: Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp()); this.setCurrentInstant(instant); + this.logger.fine(() -> this.instanceId + ": Workflow orchestrator started"); break; case ORCHESTRATORCOMPLETED: - // No action + // No action needed + this.logger.fine(() -> this.instanceId + ": Workflow orchestrator completed"); break; case EXECUTIONSTARTED: - ExecutionStartedEvent startedEvent = e.getExecutionStarted(); - String name = startedEvent.getName(); - this.setName(name); - String instanceId = startedEvent.getOrchestrationInstance().getInstanceId(); - this.setInstanceId(instanceId); - String input = startedEvent.getInput().getValue(); - this.setInput(input); - TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name); + ExecutionStartedEvent executionStarted = e.getExecutionStarted(); + this.setName(executionStarted.getName()); + this.setInput(executionStarted.getInput().getValue()); + this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId()); + this.logger.fine(() -> this.instanceId + ": Workflow execution started"); + this.appId = e.getRouter().getSource(); + + // Create and invoke the workflow orchestrator + TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(executionStarted.getName()); if (factory == null) { // Try getting the default orchestrator factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*"); } // TODO: Throw if the factory is null (orchestration by that name doesn't exist) + if (factory == null) { + throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName()); + } + TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); break; -// case EXECUTIONCOMPLETED: -// break; + case EXECUTIONCOMPLETED: + this.logger.fine(() -> this.instanceId + ": Workflow execution completed"); + break; // case EXECUTIONFAILED: // break; case EXECUTIONTERMINATED: From 2c5c5a3a84ab80fd0347886bb8884bd4fa9a7b20 Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Thu, 24 Jul 2025 11:17:59 -0500 Subject: [PATCH 06/11] builder pattern + use latest protobuf commit Signed-off-by: Cassandra Coyle --- client/build.gradle | 7 +- .../java/io/dapr/durabletask/TaskOptions.java | 101 ++++++++++++++---- .../TaskOrchestrationExecutor.java | 10 +- .../ErrorHandlingIntegrationTests.java | 8 +- .../io/dapr/durabletask/IntegrationTests.java | 8 +- .../io/dapr/durabletask/TaskOptionsTest.java | 69 ++++++++++-- .../java/com/functions/ParallelFunctions.java | 6 +- .../java/com/functions/RetriableTask.java | 6 +- .../PROTO_SOURCE_COMMIT_HASH | 2 +- internal/durabletask-protobuf/README.md | 4 +- .../protos/orchestrator_service.proto | 5 +- .../java/com/functions/ParallelFunctions.java | 6 +- .../java/com/functions/RetriableTask.java | 6 +- 13 files changed, 176 insertions(+), 62 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index 1225bc2e..5b305ec9 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -45,16 +45,15 @@ dependencies { // Netty dependencies for TLS implementation "io.grpc:grpc-netty-shaded:${grpcVersion}" - implementation "io.netty:netty-handler:4.1.94.Final" + implementation 'io.netty:netty-handler:4.1.119.Final' implementation "io.netty:netty-tcnative-boringssl-static:2.0.59.Final" // Add Netty dependencies to test classpath testImplementation "io.grpc:grpc-netty-shaded:${grpcVersion}" - testImplementation "io.netty:netty-handler:4.1.94.Final" + testImplementation 'io.netty:netty-handler:4.1.119.Final' testImplementation "io.netty:netty-tcnative-boringssl-static:2.0.59.Final" - testImplementation 'org.bouncycastle:bcprov-jdk15on:1.70' - testImplementation 'org.bouncycastle:bcpkix-jdk15on:1.70' + testImplementation 'org.bouncycastle:bcprov-jdk15on:1.78' } compileJava { diff --git a/client/src/main/java/io/dapr/durabletask/TaskOptions.java b/client/src/main/java/io/dapr/durabletask/TaskOptions.java index fcb34c33..d5d8a739 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOptions.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOptions.java @@ -10,49 +10,53 @@ public final class TaskOptions { private final RetryHandler retryHandler; private final String appID; - public TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler, String appID) { - this.retryPolicy = retryPolicy; - this.retryHandler = retryHandler; - this.appID = appID; + private TaskOptions(Builder builder) { + this.retryPolicy = builder.retryPolicy; + this.retryHandler = builder.retryHandler; + this.appID = builder.appID; } - public TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler) { - this.retryPolicy = retryPolicy; - this.retryHandler = retryHandler; - this.appID = null; + /** + * Creates a new builder for {@code TaskOptions}. + * @return a new builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Creates a new {@code TaskOptions} object with default values. + * @return a new TaskOptions instance with no configuration + */ + public static TaskOptions create() { + return new Builder().build(); } /** * Creates a new {@code TaskOptions} object from a {@link RetryPolicy}. * @param retryPolicy the retry policy to use in the new {@code TaskOptions} object. + * @return a new TaskOptions instance with the specified retry policy */ - public TaskOptions(RetryPolicy retryPolicy) { - this(retryPolicy, null, null); + public static TaskOptions withRetryPolicy(RetryPolicy retryPolicy) { + return new Builder().retryPolicy(retryPolicy).build(); } /** * Creates a new {@code TaskOptions} object from a {@link RetryHandler}. * @param retryHandler the retry handler to use in the new {@code TaskOptions} object. + * @return a new TaskOptions instance with the specified retry handler */ - public TaskOptions(RetryHandler retryHandler) { - this(null, retryHandler, null); + public static TaskOptions withRetryHandler(RetryHandler retryHandler) { + return new Builder().retryHandler(retryHandler).build(); } /** * Creates a new {@code TaskOptions} object with the specified app ID. * @param appID the app ID to use for cross-app workflow routing + * @return a new TaskOptions instance with the specified app ID */ - public TaskOptions(String appID) { - this(null, null, appID); - } - - /** - * Creates a new {@code TaskOptions} object with the specified retry policy and app ID. - * @param retryPolicy the retry policy to use - * @param appID the app ID to use for cross-app workflow routing - */ - public TaskOptions(RetryPolicy retryPolicy, String appID) { - this(retryPolicy, null, appID); + public static TaskOptions withAppID(String appID) { + return new Builder().appID(appID).build(); } boolean hasRetryPolicy() { @@ -90,4 +94,55 @@ public String getAppID() { boolean hasAppID() { return this.appID != null && !this.appID.isEmpty(); } + + /** + * Builder for creating {@code TaskOptions} instances. + */ + public static final class Builder { + private RetryPolicy retryPolicy; + private RetryHandler retryHandler; + private String appID; + + private Builder() { + // Private constructor -enforces using TaskOptions.builder() + } + + /** + * Sets the retry policy for the task options. + * @param retryPolicy the retry policy to use + * @return this builder instance for method chaining + */ + public Builder retryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + /** + * Sets the retry handler for the task options. + * @param retryHandler the retry handler to use + * @return this builder instance for method chaining + */ + public Builder retryHandler(RetryHandler retryHandler) { + this.retryHandler = retryHandler; + return this; + } + + /** + * Sets the app ID for cross-app workflow routing. + * @param appID the app ID to use + * @return this builder instance for method chaining + */ + public Builder appID(String appID) { + this.appID = appID; + return this; + } + + /** + * Builds a new {@code TaskOptions} instance with the configured values. + * @return a new TaskOptions instance + */ + public TaskOptions build() { + return new TaskOptions(this); + } + } } diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 33d78750..0a60b5ea 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -283,12 +283,12 @@ public Task callActivity( // Add router information for cross-app routing // Router always has a source app ID from EXECUTIONSTARTED event TaskRouter.Builder routerBuilder = TaskRouter.newBuilder() - .setSource(this.appId); + .setSourceAppID(this.appId); // Add target app ID if specified in options if (options != null && options.hasAppID()) { String targetAppId = options.getAppID(); - routerBuilder.setTarget(targetAppId); + routerBuilder.setTargetAppID(targetAppId); this.logger.fine(() -> String.format( "cross app routing detected: source=%s, target=%s", this.appId, targetAppId)); @@ -303,10 +303,10 @@ public Task callActivity( .setId(id) .setScheduleTask(scheduleTaskBuilder); TaskRouter.Builder actionRouterBuilder = TaskRouter.newBuilder() - .setSource(this.appId); + .setSourceAppID(this.appId); if (options != null && options.hasAppID()) { String targetAppId = options.getAppID(); - actionRouterBuilder.setTarget(targetAppId); + actionRouterBuilder.setTargetAppID(targetAppId); } actionBuilder.setRouter(actionRouterBuilder.build()); @@ -874,7 +874,7 @@ private void processEvent(HistoryEvent e) { this.setInput(executionStarted.getInput().getValue()); this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId()); this.logger.fine(() -> this.instanceId + ": Workflow execution started"); - this.appId = e.getRouter().getSource(); + this.appId = e.getRouter().getSourceAppID(); // Create and invoke the workflow orchestrator TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(executionStarted.getName()); diff --git a/client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java b/client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java index 4d78bc97..4a178509 100644 --- a/client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/ErrorHandlingIntegrationTests.java @@ -112,7 +112,7 @@ public void retryActivityFailures(int maxNumberOfAttempts) throws TimeoutExcepti ctx.callActivity( "BustedActivity", null, - new TaskOptions(retryPolicy)).await(); + TaskOptions.withRetryPolicy(retryPolicy)).await(); }); } @@ -125,7 +125,7 @@ public void retryActivityFailuresWithCustomLogic(int maxNumberOfAttempts) throws // Run the test and get back the details of the last failure this.retryOnFailuresCoreTest(maxNumberOfAttempts, maxNumberOfAttempts, ctx -> { RetryHandler retryHandler = getCommonRetryHandler(retryHandlerCalls, maxNumberOfAttempts); - TaskOptions options = new TaskOptions(retryHandler); + TaskOptions options = TaskOptions.withRetryHandler(retryHandler); ctx.callActivity("BustedActivity", null, options).await(); }); @@ -194,7 +194,7 @@ public void retrySubOrchestratorFailures(int maxNumberOfAttempts) throws Timeout "BustedSubOrchestrator", null, null, - new TaskOptions(retryPolicy)).await(); + TaskOptions.withRetryPolicy(retryPolicy)).await(); }); } @@ -207,7 +207,7 @@ public void retrySubOrchestrationFailuresWithCustomLogic(int maxNumberOfAttempts // Run the test and get back the details of the last failure this.retryOnFailuresCoreTest(maxNumberOfAttempts, maxNumberOfAttempts, ctx -> { RetryHandler retryHandler = getCommonRetryHandler(retryHandlerCalls, maxNumberOfAttempts); - TaskOptions options = new TaskOptions(retryHandler); + TaskOptions options = TaskOptions.withRetryHandler(retryHandler); ctx.callSubOrchestrator("BustedSubOrchestrator", null, null, options).await(); }); diff --git a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java index 033d8691..8043bc6e 100644 --- a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java @@ -1370,7 +1370,7 @@ void activityAllOf() throws IOException, TimeoutException { final int activityCount = 10; final AtomicBoolean throwException = new AtomicBoolean(true); final RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - final TaskOptions taskOptions = new TaskOptions(retryPolicy); + final TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { @@ -1428,7 +1428,7 @@ void activityAllOfException() throws IOException, TimeoutException { final String result = "test fail"; final int activityMiddle = 5; final RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - final TaskOptions taskOptions = new TaskOptions(retryPolicy); + final TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { @@ -1491,7 +1491,7 @@ void activityAnyOf() throws IOException, TimeoutException { final int activityCount = 10; final AtomicBoolean throwException = new AtomicBoolean(true); final RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - final TaskOptions taskOptions = new TaskOptions(retryPolicy); + final TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { @@ -1585,7 +1585,7 @@ public void taskExecutionIdTest() { var orchestratorName = "test-task-execution-id"; var retryActivityName = "RetryN"; final RetryPolicy retryPolicy = new RetryPolicy(4, Duration.ofSeconds(3)); - final TaskOptions taskOptions = new TaskOptions(retryPolicy); + final TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); var execMap = new HashMap(); diff --git a/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java b/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java index 007b258d..e258d7f4 100644 --- a/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java +++ b/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java @@ -25,7 +25,7 @@ public class TaskOptionsTest { @Test void taskOptionsWithAppID() { - TaskOptions options = new TaskOptions("app1"); + TaskOptions options = TaskOptions.withAppID("app1"); assertTrue(options.hasAppID()); assertEquals("app1", options.getAppID()); @@ -36,7 +36,10 @@ void taskOptionsWithAppID() { @Test void taskOptionsWithRetryPolicyAndAppID() { RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); - TaskOptions options = new TaskOptions(retryPolicy, null, "app2"); + TaskOptions options = TaskOptions.builder() + .retryPolicy(retryPolicy) + .appID("app2") + .build(); assertTrue(options.hasAppID()); assertEquals("app2", options.getAppID()); @@ -53,7 +56,10 @@ public boolean handle(RetryContext context) { return context.getLastAttemptNumber() < 2; } }; - TaskOptions options = new TaskOptions(null, retryHandler, "app3"); + TaskOptions options = TaskOptions.builder() + .retryHandler(retryHandler) + .appID("app3") + .build(); assertTrue(options.hasAppID()); assertEquals("app3", options.getAppID()); @@ -64,7 +70,7 @@ public boolean handle(RetryContext context) { @Test void taskOptionsWithoutAppID() { - TaskOptions options = new TaskOptions(null, null, null); + TaskOptions options = TaskOptions.create(); assertFalse(options.hasAppID()); assertNull(options.getAppID()); @@ -72,7 +78,7 @@ void taskOptionsWithoutAppID() { @Test void taskOptionsWithEmptyAppID() { - TaskOptions options = new TaskOptions(""); + TaskOptions options = TaskOptions.withAppID(""); assertFalse(options.hasAppID()); assertEquals("", options.getAppID()); @@ -80,9 +86,60 @@ void taskOptionsWithEmptyAppID() { @Test void taskOptionsWithNullAppID() { - TaskOptions options = new TaskOptions(null, null, null); + TaskOptions options = TaskOptions.builder().appID(null).build(); assertFalse(options.hasAppID()); assertNull(options.getAppID()); } + + @Test + void taskOptionsWithRetryPolicy() { + RetryPolicy retryPolicy = new RetryPolicy(5, Duration.ofMinutes(1)); + TaskOptions options = TaskOptions.withRetryPolicy(retryPolicy); + + assertTrue(options.hasRetryPolicy()); + assertEquals(retryPolicy, options.getRetryPolicy()); + assertFalse(options.hasRetryHandler()); + assertFalse(options.hasAppID()); + } + + @Test + void taskOptionsWithRetryHandler() { + RetryHandler retryHandler = new RetryHandler() { + @Override + public boolean handle(RetryContext context) { + return context.getLastAttemptNumber() < 3; + } + }; + TaskOptions options = TaskOptions.withRetryHandler(retryHandler); + + assertTrue(options.hasRetryHandler()); + assertEquals(retryHandler, options.getRetryHandler()); + assertFalse(options.hasRetryPolicy()); + assertFalse(options.hasAppID()); + } + + @Test + void taskOptionsWithBuilderChaining() { + RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(30)); + RetryHandler retryHandler = new RetryHandler() { + @Override + public boolean handle(RetryContext context) { + return context.getLastAttemptNumber() < 1; + } + }; + + TaskOptions options = TaskOptions.builder() + .retryPolicy(retryPolicy) + .retryHandler(retryHandler) + .appID("test-app") + .build(); + + assertTrue(options.hasRetryPolicy()); + assertEquals(retryPolicy, options.getRetryPolicy()); + assertTrue(options.hasRetryHandler()); + assertEquals(retryHandler, options.getRetryHandler()); + assertTrue(options.hasAppID()); + assertEquals("test-app", options.getAppID()); + } } \ No newline at end of file diff --git a/endtoendtests/src/main/java/com/functions/ParallelFunctions.java b/endtoendtests/src/main/java/com/functions/ParallelFunctions.java index 2a744e73..852b3be9 100644 --- a/endtoendtests/src/main/java/com/functions/ParallelFunctions.java +++ b/endtoendtests/src/main/java/com/functions/ParallelFunctions.java @@ -33,7 +33,7 @@ public HttpResponseMessage startParallelOrchestration( public List parallelOrchestratorSad( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); List> tasks = new ArrayList<>(); tasks.add(ctx.callActivity("AppendSad", "Input1", taskOptions, String.class)); tasks.add(ctx.callActivity("AppendSad", "Input2", taskOptions, String.class)); @@ -82,7 +82,7 @@ public HttpResponseMessage startParallelAnyOf( public Object parallelAnyOf( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); List> tasks = new ArrayList<>(); tasks.add(ctx.callActivity("AppendHappy", "AnyOf1", taskOptions, String.class)); tasks.add(ctx.callActivity("AppendHappy", "AnyOf2", String.class)); @@ -110,7 +110,7 @@ public List parallelCatchException( try { List> tasks = new ArrayList<>(); RetryPolicy policy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions options = new TaskOptions(policy); + TaskOptions options = TaskOptions.withRetryPolicy(policy); tasks.add(ctx.callActivity("AlwaysException", "Input1", options, String.class)); tasks.add(ctx.callActivity("AppendHappy", "Input2", options, String.class)); return ctx.allOf(tasks).await(); diff --git a/endtoendtests/src/main/java/com/functions/RetriableTask.java b/endtoendtests/src/main/java/com/functions/RetriableTask.java index 95377f3b..9b267ef2 100644 --- a/endtoendtests/src/main/java/com/functions/RetriableTask.java +++ b/endtoendtests/src/main/java/com/functions/RetriableTask.java @@ -42,7 +42,7 @@ public HttpResponseMessage retriableOrchestration( public String retriableTask( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("Append", "Test-Input", taskOptions, String.class).await(); } @@ -88,7 +88,7 @@ public HttpResponseMessage retriableOrchestrationSuccess( public String retriableTaskFail( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("AppendFail", "Test-Input", taskOptions, String.class).await(); } @@ -96,7 +96,7 @@ public String retriableTaskFail( public String retriableTaskSuccess( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("AppendSuccess", "Test-Input", taskOptions, String.class).await(); } diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index c06f9bc7..d8ec2af9 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -545663a4db0040bb02c642c27010e337b9be8d31 \ No newline at end of file +4b86756497d875b97f9a91051781b5711c1e4fa6 \ No newline at end of file diff --git a/internal/durabletask-protobuf/README.md b/internal/durabletask-protobuf/README.md index 4c3f2986..7193317d 100644 --- a/internal/durabletask-protobuf/README.md +++ b/internal/durabletask-protobuf/README.md @@ -21,4 +21,6 @@ If you need to manually update the proto files, you can run: ```bash ./gradlew downloadProtoFiles -PprotoBranch= -``` \ No newline at end of file +``` + +See [here for more instructions.](../../CONTRIBUTING.md#updating-protobuf-definitions) \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index c79aab16..41fad9de 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -13,8 +13,8 @@ import "google/protobuf/wrappers.proto"; import "google/protobuf/empty.proto"; message TaskRouter { - string source = 1; // orchestrationAppID - optional string target = 2; // appID + string sourceAppID = 1; + optional string targetAppID = 2; } message OrchestrationInstance { @@ -64,6 +64,7 @@ message ParentInstanceInfo { google.protobuf.StringValue name = 2; google.protobuf.StringValue version = 3; OrchestrationInstance orchestrationInstance = 4; + optional string appID = 5; } message TraceContext { diff --git a/samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java b/samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java index f7797705..74f4e399 100644 --- a/samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java +++ b/samples-azure-functions/src/main/java/com/functions/ParallelFunctions.java @@ -33,7 +33,7 @@ public HttpResponseMessage startParallelOrchestration( public List parallelOrchestratorSad( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); List> tasks = new ArrayList<>(); tasks.add(ctx.callActivity("AppendSad", "Input1", taskOptions, String.class)); tasks.add(ctx.callActivity("AppendSad", "Input2", taskOptions, String.class)); @@ -82,7 +82,7 @@ public HttpResponseMessage startParallelAnyOf( public Object parallelAnyOf( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(5)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); List> tasks = new ArrayList<>(); tasks.add(ctx.callActivity("AppendHappy", "AnyOf1", taskOptions, String.class)); tasks.add(ctx.callActivity("AppendHappy", "AnyOf2", String.class)); @@ -110,7 +110,7 @@ public List parallelCatchException( try { List> tasks = new ArrayList<>(); RetryPolicy policy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions options = new TaskOptions(policy); + TaskOptions options = TaskOptions.withRetryPolicy(policy); tasks.add(ctx.callActivity("AlwaysException", "Input1", options, String.class)); tasks.add(ctx.callActivity("AppendHappy", "Input2", options, String.class)); return ctx.allOf(tasks).await(); diff --git a/samples-azure-functions/src/main/java/com/functions/RetriableTask.java b/samples-azure-functions/src/main/java/com/functions/RetriableTask.java index 95377f3b..9b267ef2 100644 --- a/samples-azure-functions/src/main/java/com/functions/RetriableTask.java +++ b/samples-azure-functions/src/main/java/com/functions/RetriableTask.java @@ -42,7 +42,7 @@ public HttpResponseMessage retriableOrchestration( public String retriableTask( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("Append", "Test-Input", taskOptions, String.class).await(); } @@ -88,7 +88,7 @@ public HttpResponseMessage retriableOrchestrationSuccess( public String retriableTaskFail( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("AppendFail", "Test-Input", taskOptions, String.class).await(); } @@ -96,7 +96,7 @@ public String retriableTaskFail( public String retriableTaskSuccess( @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); - TaskOptions taskOptions = new TaskOptions(retryPolicy); + TaskOptions taskOptions = TaskOptions.withRetryPolicy(retryPolicy); return ctx.callActivity("AppendSuccess", "Test-Input", taskOptions, String.class).await(); } From 2f417d0cfd830f6e0372d78ffbf22d4b710c4c15 Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Thu, 24 Jul 2025 11:43:56 -0500 Subject: [PATCH 07/11] deprecate java 8, use java 11 to align with java-sdk Signed-off-by: Cassandra Coyle --- .github/workflows/build-validation.yml | 12 ------------ CONTRIBUTING.md | 15 +++++++++++++++ README.md | 5 +++++ azurefunctions/README.md | 2 +- client/build.gradle | 18 +++++------------- 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index e18f646a..949e9192 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -25,17 +25,6 @@ jobs: uses: actions/setup-java@v2 with: java-version: '11' - distribution: 'microsoft' - - - name: set JDK_11 environment variable test compiling and running - env: - ACTIONS_ALLOW_UNSECURE_COMMANDS: true - run: echo ::set-env name=JDK_11::$(echo $JAVA_HOME) - - - name: Set up JDK 8 - uses: actions/setup-java@v2 - with: - java-version: '8' distribution: 'temurin' - name: Setup Gradle @@ -57,7 +46,6 @@ jobs: - name: Run Unit Tests with Gradle run: | - export JAVA_HOME=$JDK_11 ./gradlew clean test || echo "UNIT_TEST_FAILED=true" >> $GITHUB_ENV continue-on-error: true diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index df9c5ec3..07dd5f5f 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -9,6 +9,21 @@ Build & test: ./gradlew build ``` +## Verify Build Components + +To ensure all build components work correctly after making changes: + +```shell +# Build without tests +./gradlew build -x test + +# Run unit tests +./gradlew test + +# Run SpotBugs +./gradlew spotbugsMain +``` + ## Updating Protobuf Definitions When updating the protobuf definitions in `internal/durabletask-protobuf/protos/orchestrator_service.proto`: diff --git a/README.md b/README.md index 6c33c3e5..e3e59edd 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,11 @@ This repo contains the Java SDK for the Durable Task Framework as well as classes and annotations to support running [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview?tabs=java) for Java. With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Java code. +## Requirements + +- **Java 11 or higher** - This SDK requires Java 11 as the minimum version +- **Gradle** - The project uses [Gradle 7.6.4](gradle/wrapper/gradle-wrapper.properties) + ### Simple, fault-tolerant sequences ```java diff --git a/azurefunctions/README.md b/azurefunctions/README.md index 132daa00..43ff0ffd 100644 --- a/azurefunctions/README.md +++ b/azurefunctions/README.md @@ -6,7 +6,7 @@ In this article, you follow steps to create and run a simple azure durable funct The following are the requirements for you local environment: -- The Java Developer Kit, version 8 or 11, is required. Between the two, JDK 11 is recommended. +- The Java Developer Kit, version 11 or higher, is required. - The `JAVA_HOME` environment variable must be set to the install location of the correct version of the JDK. - [Apache Maven](https://maven.apache.org/), version 3.0 or above for azure function app creation, is required for using automatic project creation tools. - If Maven isn't your preferred development tool, check out our similar tutorials to [create a function app](https://docs.microsoft.com/en-us/azure/azure-functions/create-first-function-cli-java?tabs=bash%2Cazure-cli%2Cbrowser). This README also contains instructions for [Gradle](https://gradle.org/). diff --git a/client/build.gradle b/client/build.gradle index 5b305ec9..0225576f 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -17,10 +17,7 @@ archivesBaseName = 'durabletask-client' def grpcVersion = '1.69.0' def protocVersion = '3.25.5' def jacksonVersion = '2.15.3' -// When build on local, you need to set this value to your local jdk11 directory. -// Java11 is used to compile and run all the tests. -// Example for Windows: C:/Program Files/Java/openjdk-11.0.12_7/ -def PATH_TO_TEST_JAVA_RUNTIME = System.env.JDK_11 ?: System.getProperty("java.home") +// Java 11 is now the minimum required version for both compilation and testing dependencies { @@ -53,18 +50,17 @@ dependencies { testImplementation 'io.netty:netty-handler:4.1.119.Final' testImplementation "io.netty:netty-tcnative-boringssl-static:2.0.59.Final" - testImplementation 'org.bouncycastle:bcprov-jdk15on:1.78' + testImplementation 'org.bouncycastle:bcprov-jdk15on:1.70' + testImplementation 'org.bouncycastle:bcpkix-jdk15on:1.70' } compileJava { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } compileTestJava { sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 - options.fork = true - options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/javac" } task downloadProtoFiles { @@ -115,10 +111,6 @@ sourceSets { } } -tasks.withType(Test) { - executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", 'bin/java') -} - test { useJUnitPlatform { // Skip tests tagged as "integration" since those are slower From 4e6dadf391237b2b846f5ddddc82ce4ab4dcd52f Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Thu, 24 Jul 2025 12:00:09 -0500 Subject: [PATCH 08/11] update contrib md Signed-off-by: Cassandra Coyle --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 07dd5f5f..66140126 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -21,7 +21,7 @@ To ensure all build components work correctly after making changes: ./gradlew test # Run SpotBugs -./gradlew spotbugsMain +./gradlew spotbugsMain spotbugsTest ``` ## Updating Protobuf Definitions From 5b6bb563d4da020f04cd5c2f2b61f3f08bd701ff Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Thu, 24 Jul 2025 12:22:05 -0500 Subject: [PATCH 09/11] try bulid.gradle tweak Signed-off-by: Cassandra Coyle --- client/build.gradle | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index 0225576f..44e7c086 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -68,21 +68,33 @@ task downloadProtoFiles { doLast { def protoDir = file("${rootProject.projectDir}/internal/durabletask-protobuf/protos") + def protoFile = new File(protoDir, 'orchestrator_service.proto') + def commitHashFile = new File("${rootProject.projectDir}/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH") protoDir.mkdirs() // Download the proto file new URL("https://raw.githubusercontent.com/dapr/durabletask-protobuf/${ext.branch}/protos/orchestrator_service.proto") .withInputStream { i -> - new File(protoDir, 'orchestrator_service.proto').withOutputStream { it << i } + protoFile.withOutputStream { it << i } } - - // Get and save the commit hash - def commitHashFile = new File("${rootProject.projectDir}/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH") - def commitApiUrl = new URL("https://api.github.com/repos/dapr/durabletask-protobuf/commits?path=protos/orchestrator_service.proto&sha=${ext.branch}&per_page=1") - def connection = commitApiUrl.openConnection() - connection.setRequestProperty('Accept', 'application/vnd.github.v3+json') - def commitHash = new groovy.json.JsonSlurper().parse(connection.inputStream)[0].sha - commitHashFile.text = commitHash + + try { + def commitApiUrl = new URL("https://api.github.com/repos/dapr/durabletask-protobuf/commits?path=protos/orchestrator_service.proto&sha=${ext.branch}&per_page=1") + def connection = commitApiUrl.openConnection() + connection.setRequestProperty('Accept', 'application/vnd.github.v3+json') + connection.setRequestProperty('User-Agent', 'durabletask-java-build') + + // Add GitHub token if available (automatically provided in GitHub Actions) + if (System.env.GITHUB_TOKEN) { + connection.setRequestProperty('Authorization', "token ${System.env.GITHUB_TOKEN}") + } + + def commitHash = new groovy.json.JsonSlurper().parse(connection.inputStream)[0].sha + commitHashFile.text = commitHash + logger.info("Successfully updated proto commit hash: ${commitHash}") + } catch (Exception e) { + logger.error("Failed to fetch commit hash from GitHub API: ${e.message}") + } } } From 81e280387f62b51f29e7a4a6ee2770e6cd28f84a Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Thu, 31 Jul 2025 08:54:42 -0500 Subject: [PATCH 10/11] PR feedback Signed-off-by: Cassandra Coyle --- .../src/main/java/io/dapr/durabletask/TaskOptions.java | 10 +++++----- .../io/dapr/durabletask/TaskOrchestrationExecutor.java | 6 +++++- .../test/java/io/dapr/durabletask/TaskOptionsTest.java | 10 +++------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/src/main/java/io/dapr/durabletask/TaskOptions.java b/client/src/main/java/io/dapr/durabletask/TaskOptions.java index d5d8a739..79ff542f 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOptions.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOptions.java @@ -10,10 +10,10 @@ public final class TaskOptions { private final RetryHandler retryHandler; private final String appID; - private TaskOptions(Builder builder) { - this.retryPolicy = builder.retryPolicy; - this.retryHandler = builder.retryHandler; - this.appID = builder.appID; + private TaskOptions(RetryPolicy retryPolicy, RetryHandler retryHandler, String appID) { + this.retryPolicy = retryPolicy; + this.retryHandler = retryHandler; + this.appID = appID; } /** @@ -142,7 +142,7 @@ public Builder appID(String appID) { * @return a new TaskOptions instance */ public TaskOptions build() { - return new TaskOptions(this); + return new TaskOptions(this.retryPolicy, this.retryHandler, this.appID); } } } diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 0a60b5ea..6405a114 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -147,6 +147,10 @@ public String getAppId() { return this.appId; } + private void setAppId(String appId) { + this.appId = appId; + } + @Override public Instant getCurrentInstant() { // TODO: Throw if instant is null @@ -874,7 +878,7 @@ private void processEvent(HistoryEvent e) { this.setInput(executionStarted.getInput().getValue()); this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId()); this.logger.fine(() -> this.instanceId + ": Workflow execution started"); - this.appId = e.getRouter().getSourceAppID(); + this.setAppId(e.getRouter().getSourceAppID()); // Create and invoke the workflow orchestrator TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(executionStarted.getName()); diff --git a/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java b/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java index e258d7f4..47e90f89 100644 --- a/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java +++ b/client/src/test/java/io/dapr/durabletask/TaskOptionsTest.java @@ -121,13 +121,8 @@ public boolean handle(RetryContext context) { @Test void taskOptionsWithBuilderChaining() { - RetryPolicy retryPolicy = new RetryPolicy(2, Duration.ofSeconds(30)); - RetryHandler retryHandler = new RetryHandler() { - @Override - public boolean handle(RetryContext context) { - return context.getLastAttemptNumber() < 1; - } - }; + RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); + RetryHandler retryHandler = context -> true; TaskOptions options = TaskOptions.builder() .retryPolicy(retryPolicy) @@ -135,6 +130,7 @@ public boolean handle(RetryContext context) { .appID("test-app") .build(); + assertNotNull(options); assertTrue(options.hasRetryPolicy()); assertEquals(retryPolicy, options.getRetryPolicy()); assertTrue(options.hasRetryHandler()); From d6d3f47b6683ad727d932795a692a9212738622e Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Thu, 31 Jul 2025 08:56:34 -0500 Subject: [PATCH 11/11] up version to 1.5.8 to queue up for release Signed-off-by: Cassandra Coyle --- client/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/build.gradle b/client/build.gradle index 44e7c086..bab0b189 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -11,7 +11,7 @@ plugins { } group 'io.dapr' -version = '1.5.7' +version = '1.5.8' archivesBaseName = 'durabletask-client' def grpcVersion = '1.69.0'