Skip to content

Commit 71de013

Browse files
committed
chore: Refactor durable task and introduce telemetry
Signed-off-by: Javier Aliaga <[email protected]>
1 parent 7d46cfa commit 71de013

File tree

6 files changed

+250
-84
lines changed

6 files changed

+250
-84
lines changed

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.google.protobuf.StringValue;
1717
import com.google.protobuf.Timestamp;
1818
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
19-
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TraceContext;
2019
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
2120
import io.grpc.Channel;
2221
import io.grpc.ChannelCredentials;
@@ -30,7 +29,6 @@
3029
import io.grpc.netty.NettyChannelBuilder;
3130
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
3231
import io.opentelemetry.api.GlobalOpenTelemetry;
33-
import io.opentelemetry.api.trace.Span;
3432
import io.opentelemetry.api.trace.Tracer;
3533

3634
import javax.annotation.Nullable;
@@ -46,7 +44,6 @@
4644
import java.util.concurrent.TimeUnit;
4745
import java.util.concurrent.TimeoutException;
4846
import java.util.concurrent.atomic.AtomicReference;
49-
import java.util.function.Consumer;
5047
import java.util.logging.Logger;
5148

5249
/**
@@ -204,27 +201,30 @@ public String scheduleNewOrchestrationInstance(
204201

205202
AtomicReference<OrchestratorService.CreateInstanceResponse> response = new AtomicReference<>();
206203

207-
this.observe("dapr.workflow.grpc.startInstance", (TraceContext context) -> {
208-
builder.setParentTraceContext(context);
209-
OrchestratorService.CreateInstanceRequest request = builder.build();
210-
response.set(this.sidecarClient.startInstance(request));
211-
});
204+
OrchestratorService.CreateInstanceRequest request = builder.build();
205+
response.set(this.sidecarClient.startInstance(request));
206+
207+
// this.observe("dapr.workflow.grpc.startInstance", (TraceContext context) -> {
208+
// builder.setParentTraceContext(context);
209+
// OrchestratorService.CreateInstanceRequest request = builder.build();
210+
// response.set(this.sidecarClient.startInstance(request));
211+
// });
212212

213213
return response.get().getInstanceId();
214214
}
215215

216-
private void observe(String spanName, Consumer<TraceContext> fn) {
217-
Span span = tracer.spanBuilder(spanName).startSpan();
218-
try {
219-
TraceContext.Builder traceContextBuilder = TraceContext.newBuilder();
220-
traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId());
221-
var context = traceContextBuilder.build();
222-
223-
fn.accept(context);
224-
} finally {
225-
span.end();
226-
}
227-
}
216+
// private void observe(String spanName, Consumer<TraceContext> fn) {
217+
// Span span = tracer.spanBuilder(spanName).startSpan();
218+
// try {
219+
// TraceContext.Builder traceContextBuilder = TraceContext.newBuilder();
220+
// traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId());
221+
// var context = traceContextBuilder.build();
222+
//
223+
// fn.accept(context);
224+
// } finally {
225+
// span.end();
226+
// }
227+
// }
228228

229229
@Override
230230
public void raiseEvent(String instanceId, String eventName, Object eventPayload) {
@@ -239,10 +239,8 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)
239239
builder.setInput(StringValue.of(serializedPayload));
240240
}
241241

242-
this.observe("dapr.workflow.grpc.raiseEvent", (TraceContext context) -> {
243-
OrchestratorService.RaiseEventRequest request = builder.build();
244-
this.sidecarClient.raiseEvent(request);
245-
});
242+
OrchestratorService.RaiseEventRequest request = builder.build();
243+
this.sidecarClient.raiseEvent(request);
246244

247245
}
248246

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 9 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
package io.dapr.durabletask;
1515

1616
import com.google.protobuf.StringValue;
17+
import io.dapr.durabletask.activity.ActivityRunner;
1718
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
18-
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails;
1919
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
2020
import io.grpc.Channel;
2121
import io.grpc.ManagedChannel;
2222
import io.grpc.ManagedChannelBuilder;
2323
import io.grpc.Status;
2424
import io.grpc.StatusRuntimeException;
25+
import io.opentelemetry.api.GlobalOpenTelemetry;
26+
import io.opentelemetry.api.trace.Tracer;
2527
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
2628
import io.opentelemetry.context.Context;
2729
import io.opentelemetry.context.propagation.TextMapGetter;
@@ -54,6 +56,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
5456
private final Duration maximumTimerInterval;
5557
private final ExecutorService workerPool;
5658
private final String appId; // App ID for cross-app routing
59+
private final Tracer tracer;
5760

5861
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
5962
private final boolean isExecutorServiceManaged;
@@ -85,6 +88,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
8588
sidecarGrpcChannel = this.managedSidecarChannel;
8689
}
8790

91+
this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow");
92+
8893
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
8994
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
9095
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
@@ -95,6 +100,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
95100
this.workerPool = Context.taskWrapping(rawExecutor);
96101

97102
this.isExecutorServiceManaged = builder.executorService == null;
103+
104+
98105
}
99106

100107
/**
@@ -217,59 +224,8 @@ public void startAndBlock() {
217224
activityRequest.getOrchestrationInstance().getInstanceId(),
218225
Context.current()));
219226

220-
// Extract trace context from the ActivityRequest and set it as current
221-
Context traceContext = extractTraceContext(activityRequest);
222-
223-
// TODO: Error handling
224-
this.workerPool.submit(() -> {
225-
String output = null;
226-
TaskFailureDetails failureDetails = null;
227-
try {
228-
output = taskActivityExecutor.execute(
229-
activityRequest.getName(),
230-
activityRequest.getInput().getValue(),
231-
activityRequest.getTaskExecutionId(),
232-
activityRequest.getTaskId(),
233-
activityRequest.getParentTraceContext().getTraceParent());
234-
} catch (Throwable e) {
235-
failureDetails = TaskFailureDetails.newBuilder()
236-
.setErrorType(e.getClass().getName())
237-
.setErrorMessage(e.getMessage())
238-
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
239-
.build();
240-
}
241-
242-
OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse
243-
.newBuilder()
244-
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
245-
.setTaskId(activityRequest.getTaskId())
246-
.setCompletionToken(workItem.getCompletionToken());
247-
248-
if (output != null) {
249-
responseBuilder.setResult(StringValue.of(output));
250-
}
251-
252-
if (failureDetails != null) {
253-
responseBuilder.setFailureDetails(failureDetails);
254-
}
227+
this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));
255228

256-
try {
257-
this.sidecarClient.completeActivityTask(responseBuilder.build());
258-
} catch (StatusRuntimeException e) {
259-
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
260-
logger.log(Level.WARNING,
261-
"The sidecar at address {0} is unavailable while completing the activity task.",
262-
this.getSidecarAddress());
263-
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
264-
logger.log(Level.WARNING,
265-
"Durable Task worker has disconnected from {0} while completing the activity task.",
266-
this.getSidecarAddress());
267-
} else {
268-
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
269-
this.getSidecarAddress());
270-
}
271-
}
272-
});
273229
} else if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) {
274230
// No-op
275231
} else {

durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,13 @@ public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
124124
}
125125
}
126126

127-
static String getFullStackTrace(Throwable e) {
127+
/**
128+
* Gets the full stack trace of the specified exception.
129+
*
130+
* @param e the exception
131+
* @return the full stack trace of the exception
132+
*/
133+
public static String getFullStackTrace(Throwable e) {
128134
StackTraceElement[] elements = e.getStackTrace();
129135

130136
// Plan for 256 characters per stack frame (which is likely on the high-end)

durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,18 @@
1616
import java.util.HashMap;
1717
import java.util.logging.Logger;
1818

19-
final class TaskActivityExecutor {
19+
public final class TaskActivityExecutor {
2020
private final HashMap<String, TaskActivityFactory> activityFactories;
2121
private final DataConverter dataConverter;
2222
private final Logger logger;
2323

24+
/**
25+
* Constructor.
26+
*
27+
* @param activityFactories the activity factories to use for creating activities
28+
* @param dataConverter the data converter to use for serializing and deserializing activity inputs and outputs
29+
* @param logger the logger to use for logging
30+
*/
2431
public TaskActivityExecutor(
2532
HashMap<String, TaskActivityFactory> activityFactories,
2633
DataConverter dataConverter,
@@ -30,6 +37,17 @@ public TaskActivityExecutor(
3037
this.logger = logger;
3138
}
3239

40+
/**
41+
* Executes an activity task.
42+
*
43+
* @param taskName the name of the activity task to execute
44+
* @param input the serialized input payload for the activity task
45+
* @param taskExecutionId Unique ID for the task execution.
46+
* @param taskId Auto-incrementing ID for the task.
47+
* @param traceParent The traceparent header value.
48+
* @return the serialized output payload for the activity task, or null if the activity task returned null.
49+
* @throws Throwable if an unhandled exception occurs during activity task execution.
50+
*/
3351
public String execute(String taskName, String input,
3452
String taskExecutionId, int taskId, String traceParent) throws Throwable {
3553
TaskActivityFactory factory = this.activityFactories.get(taskName);

0 commit comments

Comments
 (0)