Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ plugins {
}

group 'io.dapr'
version = '1.5.10'
version = '1.5.11-SNAPSHOT'
archivesBaseName = 'durabletask-client'

def grpcVersion = '1.69.0'
def protocVersion = '3.25.5'
def jacksonVersion = '2.15.3'
def micrometerVersion = '1.16.0'
def otelVersion = '1.56.0'
// Java 11 is now the minimum required version for both compilation and testing

dependencies {
Expand All @@ -25,6 +27,9 @@ dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation 'com.google.protobuf:protobuf-java:3.25.5'
implementation "io.micrometer:micrometer-observation:${micrometerVersion}"
implementation "io.opentelemetry:opentelemetry-api:${otelVersion}"
implementation "io.opentelemetry:opentelemetry-context:${otelVersion}"
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"

compileOnly "org.apache.tomcat:annotations-api:6.0.53"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import com.google.protobuf.StringValue;
import com.google.protobuf.Timestamp;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.*;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
Expand Down Expand Up @@ -38,6 +40,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
private final DataConverter dataConverter;
private final ManagedChannel managedSidecarChannel;
private final TaskHubSidecarServiceBlockingStub sidecarClient;
private final Tracer tracer;

DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
Expand Down Expand Up @@ -109,8 +112,9 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
// Need to keep track of this channel so we can dispose it on close()
this.managedSidecarChannel = channelBuilder.build();
sidecarGrpcChannel = this.managedSidecarChannel;
}

}
this.tracer = builder.tracer;
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
}

Expand Down Expand Up @@ -169,6 +173,23 @@ public String scheduleNewOrchestrationInstance(
builder.setScheduledStartTimestamp(ts);
}

if (tracer != null) {
Span span = tracer.spanBuilder("dapr.workflow.grpc.startInstance")
.startSpan();
try {
TraceContext.Builder traceContextBuilder = TraceContext.newBuilder();
traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId());
traceContextBuilder.setSpanID(span.getSpanContext().getSpanId());
builder.setParentTraceContext(traceContextBuilder.build());

CreateInstanceRequest request = builder.build();
CreateInstanceResponse response = this.sidecarClient.startInstance(request);
return response.getInstanceId();
} finally {
span.end();
}
}

CreateInstanceRequest request = builder.build();
CreateInstanceResponse response = this.sidecarClient.startInstance(request);
return response.getInstanceId();
Expand All @@ -180,11 +201,27 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)
Helpers.throwIfArgumentNull(eventName, "eventName");

RaiseEventRequest.Builder builder = RaiseEventRequest.newBuilder()
.setInstanceId(instanceId)
.setName(eventName);
.setInstanceId(instanceId)
.setName(eventName);
if (eventPayload != null) {
String serializedPayload = this.dataConverter.serialize(eventPayload);
builder.setInput(StringValue.of(serializedPayload));
String serializedPayload = this.dataConverter.serialize(eventPayload);
builder.setInput(StringValue.of(serializedPayload));
}

if (tracer != null) {
Span span = tracer.spanBuilder("dapr.workflow.grpc.raiseEvent")
.startSpan();
try {
// TraceContext.Builder traceContextBuilder = TraceContext.newBuilder();
// traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId());
// traceContextBuilder.setSpanID(span.getSpanContext().getSpanId());
// builder.setParentTraceContext(traceContextBuilder.build());

RaiseEventRequest request = builder.build();
this.sidecarClient.raiseEvent(request);
} finally {
span.end();
}
}

RaiseEventRequest request = builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package io.dapr.durabletask;

import io.grpc.Channel;
import io.opentelemetry.api.trace.Tracer;

/**
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
Expand All @@ -16,6 +17,7 @@ public final class DurableTaskGrpcClientBuilder {
String tlsCertPath;
String tlsKeyPath;
boolean insecure;
Tracer tracer;

/**
* Sets the {@link DataConverter} to use for converting serializable data payloads.
Expand Down Expand Up @@ -57,6 +59,17 @@ public DurableTaskGrpcClientBuilder port(int port) {
return this;
}

/**
* Sets the Tracer object to be used by DurableTaskClient to emit traces
*
* @param tracer to be used by the DurableTaskClient
* @return this builder object
*/
public DurableTaskGrpcClientBuilder tracer(Tracer tracer) {
this.tracer = tracer;
return this;
}

/**
* Sets the path to the TLS CA certificate file for server authentication.
* If not set, the system's default CA certificates will be used.
Expand Down
149 changes: 112 additions & 37 deletions client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;

import io.opentelemetry.context.Context;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.TextMapGetter;

import io.grpc.*;

import java.time.Duration;
Expand Down Expand Up @@ -72,7 +80,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();

// Create the executor and wrap it to propagate OpenTelemetry context across threads
ExecutorService rawExecutor = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
this.workerPool = Context.taskWrapping(rawExecutor);

this.isExecutorServiceManaged = builder.executorService == null;
}

Expand Down Expand Up @@ -186,58 +198,69 @@ public void startAndBlock() {
});
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
ActivityRequest activityRequest = workItem.getActivityRequest();
logger.log(Level.FINEST,
String.format("Processing activity request: %s for instance: %s}",
activityRequest.getName(),
activityRequest.getOrchestrationInstance().getInstanceId()));
logger.log(Level.INFO,
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
activityRequest.getName(),
activityRequest.getOrchestrationInstance().getInstanceId(),
Context.current()));

// Extract trace context from the ActivityRequest and set it as current
Context traceContext = extractTraceContext(activityRequest);

// The workerPool is wrapped with Context.taskWrapping() so the trace context
// will be automatically propagated to the worker thread
// TODO: Error handling
this.workerPool.submit(() -> {
// Make the extracted trace context current so the OTel Java agent can instrument HTTP calls
try (io.opentelemetry.context.Scope scope = traceContext.makeCurrent()) {
logger.log(Level.INFO,
String.format("Executing activity: %s in worker thread with trace context: %s",
activityRequest.getName(),
Context.current()));
String output = null;
TaskFailureDetails failureDetails = null;
try {
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskExecutionId(),
activityRequest.getTaskId());
// Execute the activity - HTTP calls will now be instrumented with the correct trace context
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskExecutionId(),
activityRequest.getParentTraceContext().getTraceParent(),
activityRequest.getTaskId());
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
.setErrorMessage(e.getMessage())
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
.build();
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
.setErrorMessage(e.getMessage())
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
.build();
}

ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setTaskId(activityRequest.getTaskId())
.setCompletionToken(workItem.getCompletionToken());

.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setTaskId(activityRequest.getTaskId())
.setCompletionToken(workItem.getCompletionToken());
if (output != null) {
responseBuilder.setResult(StringValue.of(output));
responseBuilder.setResult(StringValue.of(output));
}

if (failureDetails != null) {
responseBuilder.setFailureDetails(failureDetails);
responseBuilder.setFailureDetails(failureDetails);
}

try {
this.sidecarClient.completeActivityTask(responseBuilder.build());
this.sidecarClient.completeActivityTask(responseBuilder.build());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING,
"The sidecar at address {0} is unavailable while completing the activity task.",
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.WARNING,
"Durable Task worker has disconnected from {0} while completing the activity task.",
this.getSidecarAddress());
} else {
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
this.getSidecarAddress());
}
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING,
"The sidecar at address {0} is unavailable while completing the activity task.",
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.WARNING,
"Durable Task worker has disconnected from {0} while completing the activity task.",
this.getSidecarAddress());
} else {
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
this.getSidecarAddress());
}
}
}// end try-with-resources scope
});
} else if (requestType == RequestCase.HEALTHPING) {
// No-op
Expand Down Expand Up @@ -309,4 +332,56 @@ private void shutDownWorkerPool() {
private String getSidecarAddress() {
return this.sidecarClient.getChannel().authority();
}

/**
* Extracts trace context from the ActivityRequest's ParentTraceContext field
* and creates an OpenTelemetry Context with the parent span set.
*
* @param activityRequest The activity request containing the parent trace context
* @return A Context with the parent span set, or the current context if no trace context is present
*/
private Context extractTraceContext(ActivityRequest activityRequest) {
if (!activityRequest.hasParentTraceContext()) {
logger.log(Level.FINE, "No parent trace context in activity request");
return Context.current();
}

TraceContext traceContext = activityRequest.getParentTraceContext();
String traceparent = traceContext.getTraceParent();

if (traceparent == null || traceparent.isEmpty()) {
logger.log(Level.FINE, "Empty traceparent in activity request");
return Context.current();
}

logger.log(Level.INFO,
String.format("Extracting trace context from ActivityRequest: traceparent=%s", traceparent));

// Use W3CTraceContextPropagator to extract the trace context
Map<String, String> carrier = new HashMap<>();
carrier.put("traceparent", traceparent);
if (traceContext.hasTraceState()) {
carrier.put("tracestate", traceContext.getTraceState().getValue());
}

TextMapGetter<Map<String, String>> getter = new TextMapGetter<Map<String, String>>() {
@Override
public Iterable<String> keys(Map<String, String> carrier) {
return carrier.keySet();
}

@Override
public String get(Map<String, String> carrier, String key) {
return carrier.get(key);
}
};

Context extractedContext = W3CTraceContextPropagator.getInstance()
.extract(Context.current(), carrier, getter);

logger.log(Level.INFO,
String.format("Extracted trace context: %s", extractedContext));

return extractedContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ public interface TaskActivityContext {
* @return the task id of the current task activity
*/
int getTaskId();

/**
* Gets the trace parent id for the current workflow execution.
* @return trace parent id
*/
String getTraceParentId();
}
15 changes: 12 additions & 3 deletions client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public TaskActivityExecutor(
this.logger = logger;
}

public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable {
public String execute(String taskName, String input, String taskExecutionId, String parentTraceId, int taskId) throws Throwable {
TaskActivityFactory factory = this.activityFactories.get(taskName);
if (factory == null) {
throw new IllegalStateException(
Expand All @@ -32,7 +32,7 @@ public String execute(String taskName, String input, String taskExecutionId, int
String.format("The task factory '%s' returned a null TaskActivity object.", taskName));
}

TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId);
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, parentTraceId, taskId);

// Unhandled exceptions are allowed to escape
Object output = activity.run(context);
Expand All @@ -48,13 +48,17 @@ private class TaskActivityContextImpl implements TaskActivityContext {
private final String rawInput;
private final String taskExecutionId;
private final int taskId;
private final String traceParentId;

private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter;

public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) {
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId,
String traceParentId,
int taskId) {
this.name = activityName;
this.rawInput = rawInput;
this.taskExecutionId = taskExecutionId;
this.traceParentId = traceParentId;
this.taskId = taskId;
}

Expand All @@ -81,5 +85,10 @@ public String getTaskExecutionId() {
public int getTaskId() {
return this.taskId;
}

@Override
public String getTraceParentId() {
return traceParentId;
}
}
}
2 changes: 1 addition & 1 deletion internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4b86756497d875b97f9a91051781b5711c1e4fa6
474626b111db2d10e6d75b80dd277cf5993543ef
Loading
Loading