Skip to content

Commit 84a6cd7

Browse files
committed
adding tracing
1 parent da53899 commit 84a6cd7

File tree

8 files changed

+127
-10
lines changed

8 files changed

+127
-10
lines changed

client/build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ plugins {
1111
}
1212

1313
group 'io.dapr'
14-
version = '1.5.10'
14+
version = '1.5.11-SNAPSHOT'
1515
archivesBaseName = 'durabletask-client'
1616

1717
def grpcVersion = '1.69.0'
1818
def protocVersion = '3.25.5'
1919
def jacksonVersion = '2.15.3'
20+
def micrometerVersion = '1.16.0'
21+
def otelVersion = '1.56.0'
2022
// Java 11 is now the minimum required version for both compilation and testing
2123

2224
dependencies {
@@ -25,6 +27,8 @@ dependencies {
2527
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
2628
implementation "io.grpc:grpc-stub:${grpcVersion}"
2729
implementation 'com.google.protobuf:protobuf-java:3.25.5'
30+
implementation "io.micrometer:micrometer-observation:${micrometerVersion}"
31+
implementation "io.opentelemetry:opentelemetry-api:${otelVersion}"
2832
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"
2933

3034
compileOnly "org.apache.tomcat:annotations-api:6.0.53"

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import com.google.protobuf.StringValue;
66
import com.google.protobuf.Timestamp;
7+
import io.opentelemetry.api.trace.Span;
8+
import io.opentelemetry.api.trace.Tracer;
79
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.*;
810
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
911
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
@@ -38,6 +40,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
3840
private final DataConverter dataConverter;
3941
private final ManagedChannel managedSidecarChannel;
4042
private final TaskHubSidecarServiceBlockingStub sidecarClient;
43+
private final Tracer tracer;
4144

4245
DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
4346
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
@@ -109,8 +112,9 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
109112
// Need to keep track of this channel so we can dispose it on close()
110113
this.managedSidecarChannel = channelBuilder.build();
111114
sidecarGrpcChannel = this.managedSidecarChannel;
112-
}
113115

116+
}
117+
this.tracer = builder.tracer;
114118
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
115119
}
116120

@@ -169,6 +173,23 @@ public String scheduleNewOrchestrationInstance(
169173
builder.setScheduledStartTimestamp(ts);
170174
}
171175

176+
if (tracer != null) {
177+
Span span = tracer.spanBuilder("dapr.workflow.grpc.startInstance")
178+
.startSpan();
179+
try {
180+
TraceContext.Builder traceContextBuilder = TraceContext.newBuilder();
181+
traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId());
182+
traceContextBuilder.setSpanID(span.getSpanContext().getSpanId());
183+
builder.setParentTraceContext(traceContextBuilder.build());
184+
185+
CreateInstanceRequest request = builder.build();
186+
CreateInstanceResponse response = this.sidecarClient.startInstance(request);
187+
return response.getInstanceId();
188+
} finally {
189+
span.end();
190+
}
191+
}
192+
172193
CreateInstanceRequest request = builder.build();
173194
CreateInstanceResponse response = this.sidecarClient.startInstance(request);
174195
return response.getInstanceId();
@@ -180,11 +201,27 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)
180201
Helpers.throwIfArgumentNull(eventName, "eventName");
181202

182203
RaiseEventRequest.Builder builder = RaiseEventRequest.newBuilder()
183-
.setInstanceId(instanceId)
184-
.setName(eventName);
204+
.setInstanceId(instanceId)
205+
.setName(eventName);
185206
if (eventPayload != null) {
186-
String serializedPayload = this.dataConverter.serialize(eventPayload);
187-
builder.setInput(StringValue.of(serializedPayload));
207+
String serializedPayload = this.dataConverter.serialize(eventPayload);
208+
builder.setInput(StringValue.of(serializedPayload));
209+
}
210+
211+
if (tracer != null) {
212+
Span span = tracer.spanBuilder("dapr.workflow.grpc.raiseEvent")
213+
.startSpan();
214+
try {
215+
TraceContext.Builder traceContextBuilder = TraceContext.newBuilder();
216+
traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId());
217+
traceContextBuilder.setSpanID(span.getSpanContext().getSpanId());
218+
builder.setParentTraceContext(traceContextBuilder.build());
219+
220+
RaiseEventRequest request = builder.build();
221+
this.sidecarClient.raiseEvent(request);
222+
} finally {
223+
span.end();
224+
}
188225
}
189226

190227
RaiseEventRequest request = builder.build();

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package io.dapr.durabletask;
44

55
import io.grpc.Channel;
6+
import io.opentelemetry.api.trace.Tracer;
67

78
/**
89
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
@@ -16,6 +17,7 @@ public final class DurableTaskGrpcClientBuilder {
1617
String tlsCertPath;
1718
String tlsKeyPath;
1819
boolean insecure;
20+
Tracer tracer;
1921

2022
/**
2123
* Sets the {@link DataConverter} to use for converting serializable data payloads.
@@ -57,6 +59,17 @@ public DurableTaskGrpcClientBuilder port(int port) {
5759
return this;
5860
}
5961

62+
/**
63+
* Sets the Tracer object to be used by DurableTaskClient to emit traces
64+
*
65+
* @param tracer to be used by the DurableTaskClient
66+
* @return this builder object
67+
*/
68+
public DurableTaskGrpcClientBuilder tracer(Tracer tracer) {
69+
this.tracer = tracer;
70+
return this;
71+
}
72+
6073
/**
6174
* Sets the path to the TLS CA certificate file for server authentication.
6275
* If not set, the system's default CA certificates will be used.

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ public void startAndBlock() {
200200
activityRequest.getName(),
201201
activityRequest.getInput().getValue(),
202202
activityRequest.getTaskExecutionId(),
203+
activityRequest.getParentTraceContext().getTraceParent(),
203204
activityRequest.getTaskId());
204205
} catch (Throwable e) {
205206
failureDetails = TaskFailureDetails.newBuilder()

client/src/main/java/io/dapr/durabletask/TaskActivityContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,10 @@ public interface TaskActivityContext {
3434
* @return the task id of the current task activity
3535
*/
3636
int getTaskId();
37+
38+
/**
39+
* Gets the trace parent id for the current workflow execution.
40+
* @return trace parent id
41+
*/
42+
String getTraceParentId();
3743
}

client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public TaskActivityExecutor(
1919
this.logger = logger;
2020
}
2121

22-
public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable {
22+
public String execute(String taskName, String input, String taskExecutionId, String parentTraceId, int taskId) throws Throwable {
2323
TaskActivityFactory factory = this.activityFactories.get(taskName);
2424
if (factory == null) {
2525
throw new IllegalStateException(
@@ -32,7 +32,7 @@ public String execute(String taskName, String input, String taskExecutionId, int
3232
String.format("The task factory '%s' returned a null TaskActivity object.", taskName));
3333
}
3434

35-
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId);
35+
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, parentTraceId, taskId);
3636

3737
// Unhandled exceptions are allowed to escape
3838
Object output = activity.run(context);
@@ -48,13 +48,17 @@ private class TaskActivityContextImpl implements TaskActivityContext {
4848
private final String rawInput;
4949
private final String taskExecutionId;
5050
private final int taskId;
51+
private final String traceParentId;
5152

5253
private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter;
5354

54-
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) {
55+
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId,
56+
String traceParentId,
57+
int taskId) {
5558
this.name = activityName;
5659
this.rawInput = rawInput;
5760
this.taskExecutionId = taskExecutionId;
61+
this.traceParentId = traceParentId;
5862
this.taskId = taskId;
5963
}
6064

@@ -81,5 +85,10 @@ public String getTaskExecutionId() {
8185
public int getTaskId() {
8286
return this.taskId;
8387
}
88+
89+
@Override
90+
public String getTraceParentId() {
91+
return traceParentId;
92+
}
8493
}
8594
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
4b86756497d875b97f9a91051781b5711c1e4fa6
1+
474626b111db2d10e6d75b80dd277cf5993543ef

internal/durabletask-protobuf/protos/orchestrator_service.proto

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,17 @@ message PurgeInstancesRequest {
477477
PurgeInstanceFilter purgeInstanceFilter = 2;
478478
}
479479
bool recursive = 3;
480+
481+
// force will force a purge of a workflow, regardless of its current
482+
// runtime state, or whether an active worker can process it, the backend
483+
// will attempt to delete it anyway. This neccessarily means the purging is
484+
// executed out side of the workflow state machine, and therefore, can lead
485+
// to corrupt state or broken workflow execution. Usage of this should
486+
// _only_ be used when the client knows the workflow is not being currently
487+
// processed. It is highly recommended to avoid using this flag unless
488+
// absolutely necessary.
489+
// Defaults to false.
490+
optional bool force = 4;
480491
}
481492

482493
message PurgeInstanceFilter {
@@ -739,6 +750,9 @@ service TaskHubSidecarService {
739750

740751
// Rerun a Workflow from a specific event ID of a workflow instance.
741752
rpc RerunWorkflowFromEvent(RerunWorkflowFromEventRequest) returns (RerunWorkflowFromEventResponse);
753+
754+
rpc ListInstanceIDs (ListInstanceIDsRequest) returns (ListInstanceIDsResponse);
755+
rpc GetInstanceHistory (GetInstanceHistoryRequest) returns (GetInstanceHistoryResponse);
742756
}
743757

744758
message GetWorkItemsRequest {
@@ -820,3 +834,36 @@ message RerunWorkflowFromEventRequest {
820834
message RerunWorkflowFromEventResponse {
821835
string newInstanceID = 1;
822836
}
837+
838+
// ListInstanceIDsRequest is used to list all orchestration instances.
839+
message ListInstanceIDsRequest {
840+
// continuationToken is the continuation token to use for pagination. This
841+
// is the token which the next page should start from. If not given, the
842+
// first page will be returned.
843+
optional string continuationToken = 1;
844+
845+
// pageSize is the maximum number of instances to return for this page. If
846+
// not given, all instances will be attempted to be returned.
847+
optional uint32 pageSize = 2;
848+
}
849+
850+
// ListInstanceIDsResponse is the response to executing ListInstanceIDs.
851+
message ListInstanceIDsResponse {
852+
// instanceIds is the list of instance IDs returned.
853+
repeated string instanceIds = 1;
854+
855+
// continuationToken is the continuation token to use for pagination. If
856+
// there are no more pages, this will be null.
857+
optional string continuationToken = 2;
858+
}
859+
860+
// GetInstanceHistoryRequest is used to get the full history of an
861+
// orchestration instance.
862+
message GetInstanceHistoryRequest {
863+
string instanceId = 1;
864+
}
865+
866+
// GetInstanceHistoryResponse is the response to executing GetInstanceHistory.
867+
message GetInstanceHistoryResponse {
868+
repeated HistoryEvent events = 1;
869+
}

0 commit comments

Comments
 (0)