Skip to content

Commit f47fa4f

Browse files
committed
wip cross app call activity for wf
Signed-off-by: Cassandra Coyle <[email protected]>
1 parent ee93882 commit f47fa4f

File tree

7 files changed

+82
-5
lines changed

7 files changed

+82
-5
lines changed

CONTRIBUTING.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,19 @@ When updating the protobuf definitions in `internal/durabletask-protobuf/protos/
1919

2020
```shell
2121
./gradlew generateProto
22-
```
22+
```
23+
24+
## Test locally from dapr/java-sdk
25+
26+
```shell
27+
./gradlew publishToMavenLocal
28+
```
29+
30+
or simply `./gradlew build publishToMavenLocal`
31+
32+
Check if it was released locally with:
33+
```shell
34+
ls ~/.m2/repository/io/dapr/durabletask-client/
35+
```
36+
37+
Then update the durabletask-client in the java-sdk pom.xml file to the newest version you released locally.

client/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dependencies {
2626
// https://github.com/grpc/grpc-java#download
2727
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
2828
implementation "io.grpc:grpc-stub:${grpcVersion}"
29+
implementation 'com.google.protobuf:protobuf-java:3.25.5'
2930
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"
3031

3132
compileOnly "org.apache.tomcat:annotations-api:6.0.53"
@@ -194,6 +195,10 @@ java {
194195
withJavadocJar()
195196
}
196197

198+
tasks.named('sourcesJar').configure {
199+
dependsOn tasks.named('generateProto')
200+
}
201+
197202
spotbugs {
198203
toolVersion = '4.9.2'
199204
effort = 'max'

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3636
private final DataConverter dataConverter;
3737
private final Duration maximumTimerInterval;
3838
private final ExecutorService workerPool;
39+
private final String appId; // App ID for cross-app routing
3940

4041
private final TaskHubSidecarServiceBlockingStub sidecarClient;
4142
private final boolean isExecutorServiceManaged;
@@ -45,6 +46,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
4546
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
4647
this.orchestrationFactories.putAll(builder.orchestrationFactories);
4748
this.activityFactories.putAll(builder.activityFactories);
49+
this.appId = builder.appId;
4850

4951
Channel sidecarGrpcChannel;
5052
if (builder.channel != null) {
@@ -128,7 +130,8 @@ public void startAndBlock() {
128130
this.orchestrationFactories,
129131
this.dataConverter,
130132
this.maximumTimerInterval,
131-
logger);
133+
logger,
134+
this.appId);
132135
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
133136
this.activityFactories,
134137
this.dataConverter,
@@ -143,6 +146,9 @@ public void startAndBlock() {
143146
RequestCase requestType = workItem.getRequestCase();
144147
if (requestType == RequestCase.ORCHESTRATORREQUEST) {
145148
OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
149+
logger.log(Level.FINEST,
150+
String.format("Processing orchestrator request for instance: {0}",
151+
orchestratorRequest.getInstanceId()));
146152

147153
// TODO: Error handling
148154
this.workerPool.submit(() -> {
@@ -159,6 +165,9 @@ public void startAndBlock() {
159165

160166
try {
161167
this.sidecarClient.completeOrchestratorTask(response);
168+
logger.log(Level.FINEST,
169+
"Completed orchestrator request for instance: {0}",
170+
orchestratorRequest.getInstanceId());
162171
} catch (StatusRuntimeException e) {
163172
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
164173
logger.log(Level.WARNING,
@@ -177,7 +186,12 @@ public void startAndBlock() {
177186
});
178187
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
179188
ActivityRequest activityRequest = workItem.getActivityRequest();
189+
logger.log(Level.FINEST,
190+
String.format("Processing activity request: %s for instance: %s}",
191+
activityRequest.getName(),
192+
activityRequest.getOrchestrationInstance().getInstanceId()));
180193

194+
// TODO: Error handling
181195
this.workerPool.submit(() -> {
182196
String output = null;
183197
TaskFailureDetails failureDetails = null;
@@ -227,7 +241,8 @@ public void startAndBlock() {
227241
} else if (requestType == RequestCase.HEALTHPING) {
228242
// No-op
229243
} else {
230-
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.",
244+
logger.log(Level.WARNING,
245+
"Received and dropped an unknown '{0}' work-item from the sidecar.",
231246
requestType);
232247
}
233248
}

client/src/main/java/io/dapr/durabletask/NewOrchestrationInstanceOptions.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public final class NewOrchestrationInstanceOptions {
1212
private String instanceId;
1313
private Object input;
1414
private Instant startTime;
15+
private String appID; // Target app ID for cross-app workflow routing
1516

1617
/**
1718
* Default constructor for the {@link NewOrchestrationInstanceOptions} class.
@@ -71,6 +72,17 @@ public NewOrchestrationInstanceOptions setStartTime(Instant startTime) {
7172
return this;
7273
}
7374

75+
/**
76+
* Sets the target app ID for cross-app workflow routing.
77+
*
78+
* @param appID the target app ID for cross-app routing
79+
* @return this {@link NewOrchestrationInstanceOptions} object
80+
*/
81+
public NewOrchestrationInstanceOptions setAppID(String appID) {
82+
this.appID = appID;
83+
return this;
84+
}
85+
7486
/**
7587
* Gets the user-specified version of the new orchestration.
7688
*
@@ -106,4 +118,22 @@ public Object getInput() {
106118
public Instant getStartTime() {
107119
return this.startTime;
108120
}
121+
122+
/**
123+
* Gets the configured target app ID for cross-app workflow routing.
124+
*
125+
* @return the configured target app ID
126+
*/
127+
public String getAppID() {
128+
return this.appID;
129+
}
130+
131+
/**
132+
* Checks if an app ID is configured for cross-app routing.
133+
*
134+
* @return true if an app ID is configured, false otherwise
135+
*/
136+
public boolean hasAppID() {
137+
return this.appID != null && !this.appID.isEmpty();
138+
}
109139
}

client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ public interface TaskOrchestrationContext {
3636
*/
3737
String getInstanceId();
3838

39+
/**
40+
* Gets the app ID of the current orchestration instance, if available.
41+
* This is used for cross-app workflow routing.
42+
* @return the app ID of the current orchestration instance, or null if not available
43+
*/
44+
String getAppId();
45+
3946
/**
4047
* Gets the current orchestration time in UTC.
4148
* @return the current orchestration time in UTC
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ce184193bcf2d4dcf643bcc934d3ea897c4c8c5c
1+
545663a4db0040bb02c642c27010e337b9be8d31

internal/durabletask-protobuf/protos/orchestrator_service.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import "google/protobuf/empty.proto";
1414

1515
message TaskRouter {
1616
string source = 1; // orchestrationAppID
17-
string target = 2; // appID
17+
optional string target = 2; // appID
1818
}
1919

2020
message OrchestrationInstance {
@@ -29,6 +29,7 @@ message ActivityRequest {
2929
OrchestrationInstance orchestrationInstance = 4;
3030
int32 taskId = 5;
3131
TraceContext parentTraceContext = 6;
32+
string taskExecutionId = 7;
3233
}
3334

3435
message ActivityResponse {
@@ -99,16 +100,19 @@ message TaskScheduledEvent {
99100
google.protobuf.StringValue version = 2;
100101
google.protobuf.StringValue input = 3;
101102
TraceContext parentTraceContext = 4;
103+
string taskExecutionId = 5;
102104
}
103105

104106
message TaskCompletedEvent {
105107
int32 taskScheduledId = 1;
106108
google.protobuf.StringValue result = 2;
109+
string taskExecutionId = 3;
107110
}
108111

109112
message TaskFailedEvent {
110113
int32 taskScheduledId = 1;
111114
TaskFailureDetails failureDetails = 2;
115+
string taskExecutionId = 3;
112116
}
113117

114118
message SubOrchestrationInstanceCreatedEvent {
@@ -263,6 +267,7 @@ message ScheduleTaskAction {
263267
google.protobuf.StringValue version = 2;
264268
google.protobuf.StringValue input = 3;
265269
optional TaskRouter router = 4;
270+
string taskExecutionId = 5;
266271
}
267272

268273
message CreateSubOrchestrationAction {

0 commit comments

Comments
 (0)