Skip to content

Commit 768aa2e

Browse files
committed
chore: Refactor durable task and introduce telemetry
Signed-off-by: Javier Aliaga <[email protected]>
1 parent 7d46cfa commit 768aa2e

File tree

10 files changed

+390
-124
lines changed

10 files changed

+390
-124
lines changed

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.google.protobuf.StringValue;
1717
import com.google.protobuf.Timestamp;
1818
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
19-
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TraceContext;
2019
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
2120
import io.grpc.Channel;
2221
import io.grpc.ChannelCredentials;
@@ -30,7 +29,6 @@
3029
import io.grpc.netty.NettyChannelBuilder;
3130
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
3231
import io.opentelemetry.api.GlobalOpenTelemetry;
33-
import io.opentelemetry.api.trace.Span;
3432
import io.opentelemetry.api.trace.Tracer;
3533

3634
import javax.annotation.Nullable;
@@ -46,7 +44,6 @@
4644
import java.util.concurrent.TimeUnit;
4745
import java.util.concurrent.TimeoutException;
4846
import java.util.concurrent.atomic.AtomicReference;
49-
import java.util.function.Consumer;
5047
import java.util.logging.Logger;
5148

5249
/**
@@ -204,28 +201,12 @@ public String scheduleNewOrchestrationInstance(
204201

205202
AtomicReference<OrchestratorService.CreateInstanceResponse> response = new AtomicReference<>();
206203

207-
this.observe("dapr.workflow.grpc.startInstance", (TraceContext context) -> {
208-
builder.setParentTraceContext(context);
209-
OrchestratorService.CreateInstanceRequest request = builder.build();
210-
response.set(this.sidecarClient.startInstance(request));
211-
});
204+
OrchestratorService.CreateInstanceRequest request = builder.build();
205+
response.set(this.sidecarClient.startInstance(request));
212206

213207
return response.get().getInstanceId();
214208
}
215209

216-
private void observe(String spanName, Consumer<TraceContext> fn) {
217-
Span span = tracer.spanBuilder(spanName).startSpan();
218-
try {
219-
TraceContext.Builder traceContextBuilder = TraceContext.newBuilder();
220-
traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId());
221-
var context = traceContextBuilder.build();
222-
223-
fn.accept(context);
224-
} finally {
225-
span.end();
226-
}
227-
}
228-
229210
@Override
230211
public void raiseEvent(String instanceId, String eventName, Object eventPayload) {
231212
Helpers.throwIfArgumentNull(instanceId, "instanceId");
@@ -239,10 +220,8 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)
239220
builder.setInput(StringValue.of(serializedPayload));
240221
}
241222

242-
this.observe("dapr.workflow.grpc.raiseEvent", (TraceContext context) -> {
243-
OrchestratorService.RaiseEventRequest request = builder.build();
244-
this.sidecarClient.raiseEvent(request);
245-
});
223+
OrchestratorService.RaiseEventRequest request = builder.build();
224+
this.sidecarClient.raiseEvent(request);
246225

247226
}
248227

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 13 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@
1313

1414
package io.dapr.durabletask;
1515

16-
import com.google.protobuf.StringValue;
1716
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
18-
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails;
1917
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
18+
import io.dapr.durabletask.runner.ActivityRunner;
19+
import io.dapr.durabletask.runner.OrchestratorRunner;
2020
import io.grpc.Channel;
2121
import io.grpc.ManagedChannel;
2222
import io.grpc.ManagedChannelBuilder;
2323
import io.grpc.Status;
2424
import io.grpc.StatusRuntimeException;
25+
import io.opentelemetry.api.GlobalOpenTelemetry;
26+
import io.opentelemetry.api.trace.Tracer;
2527
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
2628
import io.opentelemetry.context.Context;
2729
import io.opentelemetry.context.propagation.TextMapGetter;
@@ -54,6 +56,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
5456
private final Duration maximumTimerInterval;
5557
private final ExecutorService workerPool;
5658
private final String appId; // App ID for cross-app routing
59+
private final Tracer tracer;
5760

5861
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
5962
private final boolean isExecutorServiceManaged;
@@ -85,6 +88,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
8588
sidecarGrpcChannel = this.managedSidecarChannel;
8689
}
8790

91+
this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow");
92+
8893
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
8994
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
9095
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
@@ -95,6 +100,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
95100
this.workerPool = Context.taskWrapping(rawExecutor);
96101

97102
this.isExecutorServiceManaged = builder.executorService == null;
103+
104+
98105
}
99106

100107
/**
@@ -167,109 +174,25 @@ public void startAndBlock() {
167174
while (workItemStream.hasNext()) {
168175
OrchestratorService.WorkItem workItem = workItemStream.next();
169176
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();
177+
170178
if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) {
171179
OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
172-
logger.log(Level.FINEST,
180+
logger.log(Level.INFO,
173181
String.format("Processing orchestrator request for instance: {0}",
174182
orchestratorRequest.getInstanceId()));
175183

176-
// TODO: Error handling
177-
this.workerPool.submit(() -> {
178-
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
179-
orchestratorRequest.getPastEventsList(),
180-
orchestratorRequest.getNewEventsList());
181-
182-
OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder()
183-
.setInstanceId(orchestratorRequest.getInstanceId())
184-
.addAllActions(taskOrchestratorResult.getActions())
185-
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
186-
.setCompletionToken(workItem.getCompletionToken())
187-
.build();
188-
189-
try {
190-
this.sidecarClient.completeOrchestratorTask(response);
191-
logger.log(Level.FINEST,
192-
"Completed orchestrator request for instance: {0}",
193-
orchestratorRequest.getInstanceId());
194-
} catch (StatusRuntimeException e) {
195-
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
196-
logger.log(Level.WARNING,
197-
"The sidecar at address {0} is unavailable while completing the orchestrator task.",
198-
this.getSidecarAddress());
199-
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
200-
logger.log(Level.WARNING,
201-
"Durable Task worker has disconnected from {0} while completing the orchestrator task.",
202-
this.getSidecarAddress());
203-
} else {
204-
logger.log(Level.WARNING,
205-
"Unexpected failure completing the orchestrator task at {0}.",
206-
this.getSidecarAddress());
207-
}
208-
}
209-
});
184+
this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer));
210185
} else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) {
211186
OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest();
212187

213-
214188
logger.log(Level.INFO,
215189
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
216190
activityRequest.getName(),
217191
activityRequest.getOrchestrationInstance().getInstanceId(),
218192
Context.current()));
219193

220-
// Extract trace context from the ActivityRequest and set it as current
221-
Context traceContext = extractTraceContext(activityRequest);
194+
this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));
222195

223-
// TODO: Error handling
224-
this.workerPool.submit(() -> {
225-
String output = null;
226-
TaskFailureDetails failureDetails = null;
227-
try {
228-
output = taskActivityExecutor.execute(
229-
activityRequest.getName(),
230-
activityRequest.getInput().getValue(),
231-
activityRequest.getTaskExecutionId(),
232-
activityRequest.getTaskId(),
233-
activityRequest.getParentTraceContext().getTraceParent());
234-
} catch (Throwable e) {
235-
failureDetails = TaskFailureDetails.newBuilder()
236-
.setErrorType(e.getClass().getName())
237-
.setErrorMessage(e.getMessage())
238-
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
239-
.build();
240-
}
241-
242-
OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse
243-
.newBuilder()
244-
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
245-
.setTaskId(activityRequest.getTaskId())
246-
.setCompletionToken(workItem.getCompletionToken());
247-
248-
if (output != null) {
249-
responseBuilder.setResult(StringValue.of(output));
250-
}
251-
252-
if (failureDetails != null) {
253-
responseBuilder.setFailureDetails(failureDetails);
254-
}
255-
256-
try {
257-
this.sidecarClient.completeActivityTask(responseBuilder.build());
258-
} catch (StatusRuntimeException e) {
259-
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
260-
logger.log(Level.WARNING,
261-
"The sidecar at address {0} is unavailable while completing the activity task.",
262-
this.getSidecarAddress());
263-
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
264-
logger.log(Level.WARNING,
265-
"Durable Task worker has disconnected from {0} while completing the activity task.",
266-
this.getSidecarAddress());
267-
} else {
268-
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
269-
this.getSidecarAddress());
270-
}
271-
}
272-
});
273196
} else if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) {
274197
// No-op
275198
} else {

durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,13 @@ public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
124124
}
125125
}
126126

127-
static String getFullStackTrace(Throwable e) {
127+
/**
128+
* Gets the full stack trace of the specified exception.
129+
*
130+
* @param e the exception
131+
* @return the full stack trace of the exception
132+
*/
133+
public static String getFullStackTrace(Throwable e) {
128134
StackTraceElement[] elements = e.getStackTrace();
129135

130136
// Plan for 256 characters per stack frame (which is likely on the high-end)

durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,18 @@
1616
import java.util.HashMap;
1717
import java.util.logging.Logger;
1818

19-
final class TaskActivityExecutor {
19+
public final class TaskActivityExecutor {
2020
private final HashMap<String, TaskActivityFactory> activityFactories;
2121
private final DataConverter dataConverter;
2222
private final Logger logger;
2323

24+
/**
25+
* Constructor.
26+
*
27+
* @param activityFactories the activity factories to use for creating activities
28+
* @param dataConverter the data converter to use for serializing and deserializing activity inputs and outputs
29+
* @param logger the logger to use for logging
30+
*/
2431
public TaskActivityExecutor(
2532
HashMap<String, TaskActivityFactory> activityFactories,
2633
DataConverter dataConverter,
@@ -30,6 +37,17 @@ public TaskActivityExecutor(
3037
this.logger = logger;
3138
}
3239

40+
/**
41+
* Executes an activity task.
42+
*
43+
* @param taskName the name of the activity task to execute
44+
* @param input the serialized input payload for the activity task
45+
* @param taskExecutionId Unique ID for the task execution.
46+
* @param taskId Auto-incrementing ID for the task.
47+
* @param traceParent The traceparent header value.
48+
* @return the serialized output payload for the activity task, or null if the activity task returned null.
49+
* @throws Throwable if an unhandled exception occurs during activity task execution.
50+
*/
3351
public String execute(String taskName, String input,
3452
String taskExecutionId, int taskId, String traceParent) throws Throwable {
3553
TaskActivityFactory factory = this.activityFactories.get(taskName);

durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import java.util.function.IntFunction;
4545
import java.util.logging.Logger;
4646

47-
final class TaskOrchestrationExecutor {
47+
public final class TaskOrchestrationExecutor {
4848

4949
private static final String EMPTY_STRING = "";
5050
private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories;
@@ -53,6 +53,15 @@ final class TaskOrchestrationExecutor {
5353
private final Duration maximumTimerInterval;
5454
private final String appId;
5555

56+
/**
57+
* Creates a new TaskOrchestrationExecutor.
58+
*
59+
* @param orchestrationFactories map of orchestration names to their factories
60+
* @param dataConverter converter for serializing/deserializing data
61+
* @param maximumTimerInterval maximum duration for timer intervals
62+
* @param logger logger for orchestration execution
63+
* @param appId application ID for cross-app routing
64+
*/
5665
public TaskOrchestrationExecutor(
5766
HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
5867
DataConverter dataConverter,
@@ -66,6 +75,13 @@ public TaskOrchestrationExecutor(
6675
this.appId = appId; // extracted from router
6776
}
6877

78+
/**
79+
* Executes the orchestration with the given past and new events.
80+
*
81+
* @param pastEvents list of past history events
82+
* @param newEvents list of new history events
83+
* @return the result of the orchestrator execution
84+
*/
6985
public TaskOrchestratorResult execute(List<OrchestratorService.HistoryEvent> pastEvents,
7086
List<OrchestratorService.HistoryEvent> newEvents) {
7187
ContextImplTask context = new ContextImplTask(pastEvents, newEvents);

durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,18 @@
1818
import java.util.Collection;
1919
import java.util.Collections;
2020

21-
final class TaskOrchestratorResult {
21+
public final class TaskOrchestratorResult {
2222

2323
private final Collection<OrchestratorService.OrchestratorAction> actions;
2424

2525
private final String customStatus;
2626

27+
/**
28+
* Creates a new TaskOrchestratorResult.
29+
*
30+
* @param actions the collection of orchestrator actions
31+
* @param customStatus the custom status of the orchestrator
32+
*/
2733
public TaskOrchestratorResult(Collection<OrchestratorService.OrchestratorAction> actions, String customStatus) {
2834
this.actions = Collections.unmodifiableCollection(actions);
2935
;

0 commit comments

Comments
 (0)