From afa2192e50117b4b541b27134518191b3d7296ab Mon Sep 17 00:00:00 2001 From: salaboy Date: Thu, 20 Nov 2025 09:58:28 +0000 Subject: [PATCH 1/3] adding tracing Signed-off-by: salaboy --- client/build.gradle | 6 ++- .../durabletask/DurableTaskGrpcClient.java | 47 +++++++++++++++++-- .../DurableTaskGrpcClientBuilder.java | 13 +++++ .../durabletask/DurableTaskGrpcWorker.java | 1 + .../dapr/durabletask/TaskActivityContext.java | 6 +++ .../durabletask/TaskActivityExecutor.java | 15 ++++-- .../PROTO_SOURCE_COMMIT_HASH | 2 +- .../protos/orchestrator_service.proto | 47 +++++++++++++++++++ 8 files changed, 127 insertions(+), 10 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index aeddf4ff..265c98d1 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -11,12 +11,14 @@ plugins { } group 'io.dapr' -version = '1.5.10' +version = '1.5.11-SNAPSHOT' archivesBaseName = 'durabletask-client' def grpcVersion = '1.69.0' def protocVersion = '3.25.5' def jacksonVersion = '2.15.3' +def micrometerVersion = '1.16.0' +def otelVersion = '1.56.0' // Java 11 is now the minimum required version for both compilation and testing dependencies { @@ -25,6 +27,8 @@ dependencies { implementation "io.grpc:grpc-protobuf:${grpcVersion}" implementation "io.grpc:grpc-stub:${grpcVersion}" implementation 'com.google.protobuf:protobuf-java:3.25.5' + implementation "io.micrometer:micrometer-observation:${micrometerVersion}" + implementation "io.opentelemetry:opentelemetry-api:${otelVersion}" runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}" compileOnly "org.apache.tomcat:annotations-api:6.0.53" diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java index 0ec0291a..aad28311 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java @@ -4,6 +4,8 @@ import com.google.protobuf.StringValue; import com.google.protobuf.Timestamp; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; import io.dapr.durabletask.implementation.protobuf.OrchestratorService.*; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; @@ -38,6 +40,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { private final DataConverter dataConverter; private final ManagedChannel managedSidecarChannel; private final TaskHubSidecarServiceBlockingStub sidecarClient; + private final Tracer tracer; DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); @@ -109,8 +112,9 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { // Need to keep track of this channel so we can dispose it on close() this.managedSidecarChannel = channelBuilder.build(); sidecarGrpcChannel = this.managedSidecarChannel; - } + } + this.tracer = builder.tracer; this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); } @@ -169,6 +173,23 @@ public String scheduleNewOrchestrationInstance( builder.setScheduledStartTimestamp(ts); } + if (tracer != null) { + Span span = tracer.spanBuilder("dapr.workflow.grpc.startInstance") + .startSpan(); + try { + TraceContext.Builder traceContextBuilder = TraceContext.newBuilder(); + traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId()); + traceContextBuilder.setSpanID(span.getSpanContext().getSpanId()); + builder.setParentTraceContext(traceContextBuilder.build()); + + CreateInstanceRequest request = builder.build(); + CreateInstanceResponse response = this.sidecarClient.startInstance(request); + return response.getInstanceId(); + } finally { + span.end(); + } + } + CreateInstanceRequest request = builder.build(); CreateInstanceResponse response = this.sidecarClient.startInstance(request); return response.getInstanceId(); @@ -180,11 +201,27 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) Helpers.throwIfArgumentNull(eventName, "eventName"); RaiseEventRequest.Builder builder = RaiseEventRequest.newBuilder() - .setInstanceId(instanceId) - .setName(eventName); + .setInstanceId(instanceId) + .setName(eventName); if (eventPayload != null) { - String serializedPayload = this.dataConverter.serialize(eventPayload); - builder.setInput(StringValue.of(serializedPayload)); + String serializedPayload = this.dataConverter.serialize(eventPayload); + builder.setInput(StringValue.of(serializedPayload)); + } + + if (tracer != null) { + Span span = tracer.spanBuilder("dapr.workflow.grpc.raiseEvent") + .startSpan(); + try { + TraceContext.Builder traceContextBuilder = TraceContext.newBuilder(); + traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId()); + traceContextBuilder.setSpanID(span.getSpanContext().getSpanId()); + builder.setParentTraceContext(traceContextBuilder.build()); + + RaiseEventRequest request = builder.build(); + this.sidecarClient.raiseEvent(request); + } finally { + span.end(); + } } RaiseEventRequest request = builder.build(); diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java index bac17849..3e764d15 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java @@ -3,6 +3,7 @@ package io.dapr.durabletask; import io.grpc.Channel; +import io.opentelemetry.api.trace.Tracer; /** * Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process @@ -16,6 +17,7 @@ public final class DurableTaskGrpcClientBuilder { String tlsCertPath; String tlsKeyPath; boolean insecure; + Tracer tracer; /** * Sets the {@link DataConverter} to use for converting serializable data payloads. @@ -57,6 +59,17 @@ public DurableTaskGrpcClientBuilder port(int port) { return this; } + /** + * Sets the Tracer object to be used by DurableTaskClient to emit traces + * + * @param tracer to be used by the DurableTaskClient + * @return this builder object + */ + public DurableTaskGrpcClientBuilder tracer(Tracer tracer) { + this.tracer = tracer; + return this; + } + /** * Sets the path to the TLS CA certificate file for server authentication. * If not set, the system's default CA certificates will be used. diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index d9e3bc65..1efc85cc 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -200,6 +200,7 @@ public void startAndBlock() { activityRequest.getName(), activityRequest.getInput().getValue(), activityRequest.getTaskExecutionId(), + activityRequest.getParentTraceContext().getTraceParent(), activityRequest.getTaskId()); } catch (Throwable e) { failureDetails = TaskFailureDetails.newBuilder() diff --git a/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java b/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java index 316fb523..a641b1c2 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java +++ b/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java @@ -34,4 +34,10 @@ public interface TaskActivityContext { * @return the task id of the current task activity */ int getTaskId(); + + /** + * Gets the trace parent id for the current workflow execution. + * @return trace parent id + */ + String getTraceParentId(); } diff --git a/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java index a7513192..9c1bc882 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java @@ -19,7 +19,7 @@ public TaskActivityExecutor( this.logger = logger; } - public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable { + public String execute(String taskName, String input, String taskExecutionId, String parentTraceId, int taskId) throws Throwable { TaskActivityFactory factory = this.activityFactories.get(taskName); if (factory == null) { throw new IllegalStateException( @@ -32,7 +32,7 @@ public String execute(String taskName, String input, String taskExecutionId, int String.format("The task factory '%s' returned a null TaskActivity object.", taskName)); } - TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId); + TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, parentTraceId, taskId); // Unhandled exceptions are allowed to escape Object output = activity.run(context); @@ -48,13 +48,17 @@ private class TaskActivityContextImpl implements TaskActivityContext { private final String rawInput; private final String taskExecutionId; private final int taskId; + private final String traceParentId; private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter; - public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) { + public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, + String traceParentId, + int taskId) { this.name = activityName; this.rawInput = rawInput; this.taskExecutionId = taskExecutionId; + this.traceParentId = traceParentId; this.taskId = taskId; } @@ -81,5 +85,10 @@ public String getTaskExecutionId() { public int getTaskId() { return this.taskId; } + + @Override + public String getTraceParentId() { + return traceParentId; + } } } diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index d8ec2af9..203a5fcc 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -4b86756497d875b97f9a91051781b5711c1e4fa6 \ No newline at end of file +474626b111db2d10e6d75b80dd277cf5993543ef \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index 41fad9de..b86bba61 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -477,6 +477,17 @@ message PurgeInstancesRequest { PurgeInstanceFilter purgeInstanceFilter = 2; } bool recursive = 3; + + // force will force a purge of a workflow, regardless of its current + // runtime state, or whether an active worker can process it, the backend + // will attempt to delete it anyway. This neccessarily means the purging is + // executed out side of the workflow state machine, and therefore, can lead + // to corrupt state or broken workflow execution. Usage of this should + // _only_ be used when the client knows the workflow is not being currently + // processed. It is highly recommended to avoid using this flag unless + // absolutely necessary. + // Defaults to false. + optional bool force = 4; } message PurgeInstanceFilter { @@ -739,6 +750,9 @@ service TaskHubSidecarService { // Rerun a Workflow from a specific event ID of a workflow instance. rpc RerunWorkflowFromEvent(RerunWorkflowFromEventRequest) returns (RerunWorkflowFromEventResponse); + + rpc ListInstanceIDs (ListInstanceIDsRequest) returns (ListInstanceIDsResponse); + rpc GetInstanceHistory (GetInstanceHistoryRequest) returns (GetInstanceHistoryResponse); } message GetWorkItemsRequest { @@ -820,3 +834,36 @@ message RerunWorkflowFromEventRequest { message RerunWorkflowFromEventResponse { string newInstanceID = 1; } + +// ListInstanceIDsRequest is used to list all orchestration instances. +message ListInstanceIDsRequest { + // continuationToken is the continuation token to use for pagination. This + // is the token which the next page should start from. If not given, the + // first page will be returned. + optional string continuationToken = 1; + + // pageSize is the maximum number of instances to return for this page. If + // not given, all instances will be attempted to be returned. + optional uint32 pageSize = 2; +} + +// ListInstanceIDsResponse is the response to executing ListInstanceIDs. +message ListInstanceIDsResponse { + // instanceIds is the list of instance IDs returned. + repeated string instanceIds = 1; + + // continuationToken is the continuation token to use for pagination. If + // there are no more pages, this will be null. + optional string continuationToken = 2; +} + +// GetInstanceHistoryRequest is used to get the full history of an +// orchestration instance. +message GetInstanceHistoryRequest { + string instanceId = 1; +} + +// GetInstanceHistoryResponse is the response to executing GetInstanceHistory. +message GetInstanceHistoryResponse { + repeated HistoryEvent events = 1; +} From de8c665beb20c435e94de01ec3898a54d7febb85 Mon Sep 17 00:00:00 2001 From: salaboy Date: Thu, 20 Nov 2025 10:00:28 +0000 Subject: [PATCH 2/3] not yet for RaiseEvent Signed-off-by: salaboy --- .../java/io/dapr/durabletask/DurableTaskGrpcClient.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java index aad28311..1f65a103 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java @@ -212,10 +212,10 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) Span span = tracer.spanBuilder("dapr.workflow.grpc.raiseEvent") .startSpan(); try { - TraceContext.Builder traceContextBuilder = TraceContext.newBuilder(); - traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId()); - traceContextBuilder.setSpanID(span.getSpanContext().getSpanId()); - builder.setParentTraceContext(traceContextBuilder.build()); +// TraceContext.Builder traceContextBuilder = TraceContext.newBuilder(); +// traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId()); +// traceContextBuilder.setSpanID(span.getSpanContext().getSpanId()); +// builder.setParentTraceContext(traceContextBuilder.build()); RaiseEventRequest request = builder.build(); this.sidecarClient.raiseEvent(request); From bb0e21ad589e51d3d263eebb4f9405b2f885beb9 Mon Sep 17 00:00:00 2001 From: salaboy Date: Mon, 24 Nov 2025 17:40:27 +0000 Subject: [PATCH 3/3] adding otel context wrapper for executor Signed-off-by: salaboy --- client/build.gradle | 1 + .../durabletask/DurableTaskGrpcWorker.java | 150 +++++++++++++----- 2 files changed, 113 insertions(+), 38 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index 265c98d1..029c1923 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -29,6 +29,7 @@ dependencies { implementation 'com.google.protobuf:protobuf-java:3.25.5' implementation "io.micrometer:micrometer-observation:${micrometerVersion}" implementation "io.opentelemetry:opentelemetry-api:${otelVersion}" + implementation "io.opentelemetry:opentelemetry-context:${otelVersion}" runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}" compileOnly "org.apache.tomcat:annotations-api:6.0.53" diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 1efc85cc..ce8ef122 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -9,6 +9,14 @@ import io.dapr.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; +import io.opentelemetry.context.Context; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.TextMapGetter; + import io.grpc.*; import java.time.Duration; @@ -72,7 +80,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; - this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); + + // Create the executor and wrap it to propagate OpenTelemetry context across threads + ExecutorService rawExecutor = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); + this.workerPool = Context.taskWrapping(rawExecutor); + this.isExecutorServiceManaged = builder.executorService == null; } @@ -186,59 +198,69 @@ public void startAndBlock() { }); } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); - logger.log(Level.FINEST, - String.format("Processing activity request: %s for instance: %s}", - activityRequest.getName(), - activityRequest.getOrchestrationInstance().getInstanceId())); + logger.log(Level.INFO, + String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", + activityRequest.getName(), + activityRequest.getOrchestrationInstance().getInstanceId(), + Context.current())); + // Extract trace context from the ActivityRequest and set it as current + Context traceContext = extractTraceContext(activityRequest); + + // The workerPool is wrapped with Context.taskWrapping() so the trace context + // will be automatically propagated to the worker thread // TODO: Error handling this.workerPool.submit(() -> { + // Make the extracted trace context current so the OTel Java agent can instrument HTTP calls + try (io.opentelemetry.context.Scope scope = traceContext.makeCurrent()) { + logger.log(Level.INFO, + String.format("Executing activity: %s in worker thread with trace context: %s", + activityRequest.getName(), + Context.current())); String output = null; TaskFailureDetails failureDetails = null; try { - output = taskActivityExecutor.execute( - activityRequest.getName(), - activityRequest.getInput().getValue(), - activityRequest.getTaskExecutionId(), - activityRequest.getParentTraceContext().getTraceParent(), - activityRequest.getTaskId()); + // Execute the activity - HTTP calls will now be instrumented with the correct trace context + output = taskActivityExecutor.execute( + activityRequest.getName(), + activityRequest.getInput().getValue(), + activityRequest.getTaskExecutionId(), + activityRequest.getParentTraceContext().getTraceParent(), + activityRequest.getTaskId()); } catch (Throwable e) { - failureDetails = TaskFailureDetails.newBuilder() - .setErrorType(e.getClass().getName()) - .setErrorMessage(e.getMessage()) - .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) - .build(); + failureDetails = TaskFailureDetails.newBuilder() + .setErrorType(e.getClass().getName()) + .setErrorMessage(e.getMessage()) + .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) + .build(); } - ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder() - .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) - .setTaskId(activityRequest.getTaskId()) - .setCompletionToken(workItem.getCompletionToken()); - + .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) + .setTaskId(activityRequest.getTaskId()) + .setCompletionToken(workItem.getCompletionToken()); if (output != null) { - responseBuilder.setResult(StringValue.of(output)); + responseBuilder.setResult(StringValue.of(output)); } - if (failureDetails != null) { - responseBuilder.setFailureDetails(failureDetails); + responseBuilder.setFailureDetails(failureDetails); } - try { - this.sidecarClient.completeActivityTask(responseBuilder.build()); + this.sidecarClient.completeActivityTask(responseBuilder.build()); } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.log(Level.WARNING, - "The sidecar at address {0} is unavailable while completing the activity task.", - this.getSidecarAddress()); - } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.WARNING, - "Durable Task worker has disconnected from {0} while completing the activity task.", - this.getSidecarAddress()); - } else { - logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", - this.getSidecarAddress()); - } + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, + "The sidecar at address {0} is unavailable while completing the activity task.", + this.getSidecarAddress()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, + "Durable Task worker has disconnected from {0} while completing the activity task.", + this.getSidecarAddress()); + } else { + logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", + this.getSidecarAddress()); + } } + }// end try-with-resources scope }); } else if (requestType == RequestCase.HEALTHPING) { // No-op @@ -310,4 +332,56 @@ private void shutDownWorkerPool() { private String getSidecarAddress() { return this.sidecarClient.getChannel().authority(); } + + /** + * Extracts trace context from the ActivityRequest's ParentTraceContext field + * and creates an OpenTelemetry Context with the parent span set. + * + * @param activityRequest The activity request containing the parent trace context + * @return A Context with the parent span set, or the current context if no trace context is present + */ + private Context extractTraceContext(ActivityRequest activityRequest) { + if (!activityRequest.hasParentTraceContext()) { + logger.log(Level.FINE, "No parent trace context in activity request"); + return Context.current(); + } + + TraceContext traceContext = activityRequest.getParentTraceContext(); + String traceparent = traceContext.getTraceParent(); + + if (traceparent == null || traceparent.isEmpty()) { + logger.log(Level.FINE, "Empty traceparent in activity request"); + return Context.current(); + } + + logger.log(Level.INFO, + String.format("Extracting trace context from ActivityRequest: traceparent=%s", traceparent)); + + // Use W3CTraceContextPropagator to extract the trace context + Map carrier = new HashMap<>(); + carrier.put("traceparent", traceparent); + if (traceContext.hasTraceState()) { + carrier.put("tracestate", traceContext.getTraceState().getValue()); + } + + TextMapGetter> getter = new TextMapGetter>() { + @Override + public Iterable keys(Map carrier) { + return carrier.keySet(); + } + + @Override + public String get(Map carrier, String key) { + return carrier.get(key); + } + }; + + Context extractedContext = W3CTraceContextPropagator.getInstance() + .extract(Context.current(), carrier, getter); + + logger.log(Level.INFO, + String.format("Extracted trace context: %s", extractedContext)); + + return extractedContext; + } }