Skip to content

Commit 3302911

Browse files
committed
cross app activity calls works
Signed-off-by: Cassandra Coyle <[email protected]>
1 parent f47fa4f commit 3302911

File tree

3 files changed

+220
-158
lines changed

3 files changed

+220
-158
lines changed
Lines changed: 154 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,138 +1,154 @@
1-
// Copyright (c) Microsoft Corporation. All rights reserved.
2-
// Licensed under the MIT License.
3-
package io.dapr.durabletask;
4-
5-
import io.grpc.Channel;
6-
7-
import java.time.Duration;
8-
import java.util.HashMap;
9-
import java.util.concurrent.Executor;
10-
import java.util.concurrent.ExecutorService;
11-
import java.util.concurrent.Executors;
12-
13-
/**
14-
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.
15-
*/
16-
public final class DurableTaskGrpcWorkerBuilder {
17-
final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
18-
final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
19-
int port;
20-
Channel channel;
21-
DataConverter dataConverter;
22-
Duration maximumTimerInterval;
23-
ExecutorService executorService;
24-
25-
/**
26-
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
27-
*
28-
* @param factory an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}
29-
* @return this builder object
30-
*/
31-
public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) {
32-
String key = factory.getName();
33-
if (key == null || key.length() == 0) {
34-
throw new IllegalArgumentException("A non-empty task orchestration name is required.");
35-
}
36-
37-
if (this.orchestrationFactories.containsKey(key)) {
38-
throw new IllegalArgumentException(
39-
String.format("A task orchestration factory named %s is already registered.", key));
40-
}
41-
42-
this.orchestrationFactories.put(key, factory);
43-
return this;
44-
}
45-
46-
/**
47-
* Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}.
48-
*
49-
* @param factory an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}
50-
* @return this builder object
51-
*/
52-
public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) {
53-
// TODO: Input validation
54-
String key = factory.getName();
55-
if (key == null || key.length() == 0) {
56-
throw new IllegalArgumentException("A non-empty task activity name is required.");
57-
}
58-
59-
if (this.activityFactories.containsKey(key)) {
60-
throw new IllegalArgumentException(
61-
String.format("A task activity factory named %s is already registered.", key));
62-
}
63-
64-
this.activityFactories.put(key, factory);
65-
return this;
66-
}
67-
68-
/**
69-
* Sets the gRPC channel to use for communicating with the sidecar process.
70-
* <p>
71-
* This builder method allows you to provide your own gRPC channel for communicating with the Durable Task sidecar
72-
* endpoint. Channels provided using this method won't be closed when the worker is closed.
73-
* Rather, the caller remains responsible for shutting down the channel after disposing the worker.
74-
* <p>
75-
* If not specified, a gRPC channel will be created automatically for each constructed
76-
* {@link DurableTaskGrpcWorker}.
77-
*
78-
* @param channel the gRPC channel to use
79-
* @return this builder object
80-
*/
81-
public DurableTaskGrpcWorkerBuilder grpcChannel(Channel channel) {
82-
this.channel = channel;
83-
return this;
84-
}
85-
86-
/**
87-
* Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used.
88-
*
89-
* @param port the gRPC endpoint port to connect to
90-
* @return this builder object
91-
*/
92-
public DurableTaskGrpcWorkerBuilder port(int port) {
93-
this.port = port;
94-
return this;
95-
}
96-
97-
/**
98-
* Sets the {@link DataConverter} to use for converting serializable data payloads.
99-
*
100-
* @param dataConverter the {@link DataConverter} to use for converting serializable data payloads
101-
* @return this builder object
102-
*/
103-
public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) {
104-
this.dataConverter = dataConverter;
105-
return this;
106-
}
107-
108-
/**
109-
* Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used.
110-
* The default maximum timer interval duration is 3 days.
111-
*
112-
* @param maximumTimerInterval the maximum timer interval
113-
* @return this builder object
114-
*/
115-
public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) {
116-
this.maximumTimerInterval = maximumTimerInterval;
117-
return this;
118-
}
119-
120-
/**
121-
* Sets the executor service that will be used to execute threads.
122-
*
123-
* @param executorService {@link ExecutorService}.
124-
* @return this builder object.
125-
*/
126-
public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) {
127-
this.executorService = executorService;
128-
return this;
129-
}
130-
131-
/**
132-
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
133-
* @return a new {@link DurableTaskGrpcWorker} object
134-
*/
135-
public DurableTaskGrpcWorker build() {
136-
return new DurableTaskGrpcWorker(this);
137-
}
138-
}
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package io.dapr.durabletask;
4+
5+
import io.grpc.Channel;
6+
7+
import java.time.Duration;
8+
import java.util.HashMap;
9+
import java.util.concurrent.Executor;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
13+
/**
14+
* Builder object for constructing customized {@link DurableTaskGrpcWorker} instances.
15+
*/
16+
public final class DurableTaskGrpcWorkerBuilder {
17+
final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
18+
final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
19+
int port;
20+
Channel channel;
21+
DataConverter dataConverter;
22+
Duration maximumTimerInterval;
23+
ExecutorService executorService;
24+
String appId; // App ID for cross-app routing
25+
26+
/**
27+
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
28+
*
29+
* @param factory an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}
30+
* @return this builder object
31+
*/
32+
public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) {
33+
String key = factory.getName();
34+
if (key == null || key.length() == 0) {
35+
throw new IllegalArgumentException("A non-empty task orchestration name is required.");
36+
}
37+
38+
if (this.orchestrationFactories.containsKey(key)) {
39+
throw new IllegalArgumentException(
40+
String.format("A task orchestration factory named %s is already registered.", key));
41+
}
42+
43+
this.orchestrationFactories.put(key, factory);
44+
return this;
45+
}
46+
47+
/**
48+
* Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}.
49+
*
50+
* @param factory an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}
51+
* @return this builder object
52+
*/
53+
public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) {
54+
// TODO: Input validation
55+
String key = factory.getName();
56+
if (key == null || key.length() == 0) {
57+
throw new IllegalArgumentException("A non-empty task activity name is required.");
58+
}
59+
60+
if (this.activityFactories.containsKey(key)) {
61+
throw new IllegalArgumentException(
62+
String.format("A task activity factory named %s is already registered.", key));
63+
}
64+
65+
this.activityFactories.put(key, factory);
66+
return this;
67+
}
68+
69+
/**
70+
* Sets the gRPC channel to use for communicating with the sidecar process.
71+
* <p>
72+
* This builder method allows you to provide your own gRPC channel for communicating with the Durable Task sidecar
73+
* endpoint. Channels provided using this method won't be closed when the worker is closed.
74+
* Rather, the caller remains responsible for shutting down the channel after disposing the worker.
75+
* <p>
76+
* If not specified, a gRPC channel will be created automatically for each constructed
77+
* {@link DurableTaskGrpcWorker}.
78+
*
79+
* @param channel the gRPC channel to use
80+
* @return this builder object
81+
*/
82+
public DurableTaskGrpcWorkerBuilder grpcChannel(Channel channel) {
83+
this.channel = channel;
84+
return this;
85+
}
86+
87+
/**
88+
* Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used.
89+
*
90+
* @param port the gRPC endpoint port to connect to
91+
* @return this builder object
92+
*/
93+
public DurableTaskGrpcWorkerBuilder port(int port) {
94+
this.port = port;
95+
return this;
96+
}
97+
98+
/**
99+
* Sets the {@link DataConverter} to use for converting serializable data payloads.
100+
*
101+
* @param dataConverter the {@link DataConverter} to use for converting serializable data payloads
102+
* @return this builder object
103+
*/
104+
public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) {
105+
this.dataConverter = dataConverter;
106+
return this;
107+
}
108+
109+
/**
110+
* Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used.
111+
* The default maximum timer interval duration is 3 days.
112+
*
113+
* @param maximumTimerInterval the maximum timer interval
114+
* @return this builder object
115+
*/
116+
public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) {
117+
this.maximumTimerInterval = maximumTimerInterval;
118+
return this;
119+
}
120+
121+
/**
122+
* Sets the executor service that will be used to execute threads.
123+
*
124+
* @param executorService {@link ExecutorService}.
125+
* @return this builder object.
126+
*/
127+
public DurableTaskGrpcWorkerBuilder withExecutorService(ExecutorService executorService) {
128+
this.executorService = executorService;
129+
return this;
130+
}
131+
132+
/**
133+
* Sets the app ID for cross-app workflow routing.
134+
* <p>
135+
* This app ID is used to identify this worker in cross-app routing scenarios.
136+
* It should match the app ID configured in the Dapr sidecar.
137+
* <p>
138+
*
139+
* @param appId the app ID for this worker
140+
* @return this builder object
141+
*/
142+
public DurableTaskGrpcWorkerBuilder appId(String appId) {
143+
this.appId = appId;
144+
return this;
145+
}
146+
147+
/**
148+
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
149+
* @return a new {@link DurableTaskGrpcWorker} object
150+
*/
151+
public DurableTaskGrpcWorker build() {
152+
return new DurableTaskGrpcWorker(this);
153+
}
154+
}

client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ public TaskOrchestration create() {
129129
orchestrationFactories,
130130
new JacksonDataConverter(),
131131
DEFAULT_MAXIMUM_TIMER_INTERVAL,
132-
logger);
132+
logger,
133+
null); // No app ID for static runner
133134

134135
// TODO: Error handling
135136
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(

0 commit comments

Comments
 (0)