Skip to content

Commit d623475

Browse files
committed
wip: tracing
Signed-off-by: Javier Aliaga <[email protected]>
1 parent 08652db commit d623475

File tree

7 files changed

+173
-21
lines changed

7 files changed

+173
-21
lines changed

durabletask-client/pom.xml

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,18 @@
7474
<artifactId>testcontainers</artifactId>
7575
<scope>test</scope>
7676
</dependency>
77+
<dependency>
78+
<groupId>io.micrometer</groupId>
79+
<artifactId>micrometer-observation</artifactId>
80+
</dependency>
81+
<dependency>
82+
<groupId>io.opentelemetry</groupId>
83+
<artifactId>opentelemetry-api</artifactId>
84+
</dependency>
85+
<dependency>
86+
<groupId>io.opentelemetry</groupId>
87+
<artifactId>opentelemetry-context</artifactId>
88+
</dependency>
7789
</dependencies>
7890
<build>
7991
<plugins>
@@ -109,24 +121,32 @@
109121
</executions>
110122
</plugin>
111123
<plugin>
112-
<groupId>org.xolstice.maven.plugins</groupId>
124+
<groupId>io.github.ascopes</groupId>
113125
<artifactId>protobuf-maven-plugin</artifactId>
114126
<version>${protobuf-maven-plugin.version}</version>
115127
<configuration>
116-
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
117-
<pluginId>grpc-java</pluginId>
118-
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
119-
<protoSourceRoot>${protobuf.input.directory}</protoSourceRoot>
128+
<protocVersion>${protobuf.version}</protocVersion>
129+
<embedSourcesInClassOutputs>true</embedSourcesInClassOutputs>
130+
<sourceDirectories>
131+
<sourceDirectory>${protobuf.input.directory}</sourceDirectory>
132+
</sourceDirectories>
133+
<binaryMavenPlugins>
134+
<binaryMavenPlugin>
135+
<groupId>io.grpc</groupId>
136+
<artifactId>protoc-gen-grpc-java</artifactId>
137+
<version>${grpc.version}</version>
138+
</binaryMavenPlugin>
139+
</binaryMavenPlugins>
120140
</configuration>
121141
<executions>
122142
<execution>
123143
<goals>
124-
<goal>compile</goal>
125-
<goal>compile-custom</goal>
144+
<goal>generate</goal>
126145
</goals>
127146
</execution>
128147
</executions>
129148
</plugin>
149+
130150
<plugin>
131151
<groupId>org.apache.maven.plugins</groupId>
132152
<artifactId>maven-source-plugin</artifactId>

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.google.protobuf.StringValue;
1717
import com.google.protobuf.Timestamp;
1818
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
19+
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TraceContext;
1920
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
2021
import io.grpc.Channel;
2122
import io.grpc.ChannelCredentials;
@@ -28,6 +29,9 @@
2829
import io.grpc.netty.GrpcSslContexts;
2930
import io.grpc.netty.NettyChannelBuilder;
3031
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
32+
import io.opentelemetry.api.OpenTelemetry;
33+
import io.opentelemetry.api.trace.Span;
34+
import io.opentelemetry.api.trace.Tracer;
3135

3236
import javax.annotation.Nullable;
3337
import java.io.FileInputStream;
@@ -41,6 +45,8 @@
4145
import java.util.UUID;
4246
import java.util.concurrent.TimeUnit;
4347
import java.util.concurrent.TimeoutException;
48+
import java.util.concurrent.atomic.AtomicReference;
49+
import java.util.function.Consumer;
4450
import java.util.logging.Logger;
4551

4652
/**
@@ -57,6 +63,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
5763
private final DataConverter dataConverter;
5864
private final ManagedChannel managedSidecarChannel;
5965
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
66+
private final Tracer tracer;
6067

6168
DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
6269
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
@@ -130,6 +137,12 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
130137
sidecarGrpcChannel = this.managedSidecarChannel;
131138
}
132139

140+
if (builder.tracer != null) {
141+
this.tracer = builder.tracer;
142+
} else {
143+
this.tracer = OpenTelemetry.noop().getTracer("DurableTaskGrpcClient");
144+
}
145+
133146
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
134147
}
135148

@@ -188,9 +201,28 @@ public String scheduleNewOrchestrationInstance(
188201
builder.setScheduledStartTimestamp(ts);
189202
}
190203

191-
OrchestratorService.CreateInstanceRequest request = builder.build();
192-
OrchestratorService.CreateInstanceResponse response = this.sidecarClient.startInstance(request);
193-
return response.getInstanceId();
204+
AtomicReference<OrchestratorService.CreateInstanceResponse> response = new AtomicReference<>();
205+
206+
this.observe("dapr.workflow.grpc.startInstance", (TraceContext context) -> {
207+
builder.setParentTraceContext(context);
208+
OrchestratorService.CreateInstanceRequest request = builder.build();
209+
response.set(this.sidecarClient.startInstance(request));
210+
});
211+
212+
return response.get().getInstanceId();
213+
}
214+
215+
private void observe(String spanName, Consumer<TraceContext> fn) {
216+
Span span = tracer.spanBuilder(spanName).startSpan();
217+
try {
218+
TraceContext.Builder traceContextBuilder = TraceContext.newBuilder();
219+
traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId());
220+
var context = traceContextBuilder.build();
221+
222+
fn.accept(context);
223+
} finally {
224+
span.end();
225+
}
194226
}
195227

196228
@Override
@@ -206,8 +238,11 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload)
206238
builder.setInput(StringValue.of(serializedPayload));
207239
}
208240

209-
OrchestratorService.RaiseEventRequest request = builder.build();
210-
this.sidecarClient.raiseEvent(request);
241+
this.observe("dapr.workflow.grpc.raiseEvent", (TraceContext context) -> {
242+
OrchestratorService.RaiseEventRequest request = builder.build();
243+
this.sidecarClient.raiseEvent(request);
244+
});
245+
211246
}
212247

213248
@Override

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.dapr.durabletask;
1515

1616
import io.grpc.Channel;
17+
import io.opentelemetry.api.trace.Tracer;
1718

1819
/**
1920
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
@@ -27,6 +28,7 @@ public final class DurableTaskGrpcClientBuilder {
2728
String tlsCertPath;
2829
String tlsKeyPath;
2930
boolean insecure;
31+
Tracer tracer;
3032

3133
/**
3234
* Sets the {@link DataConverter} to use for converting serializable data payloads.
@@ -57,6 +59,17 @@ public DurableTaskGrpcClientBuilder grpcChannel(Channel channel) {
5759
return this;
5860
}
5961

62+
/**
63+
* Sets the Tracer object to be used by DurableTaskClient to emit traces.
64+
*
65+
* @param tracer to be used by the DurableTaskClient
66+
* @return this builder object
67+
*/
68+
public DurableTaskGrpcClientBuilder tracer(Tracer tracer) {
69+
this.tracer = tracer;
70+
return this;
71+
}
72+
6073
/**
6174
* Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used.
6275
*

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@
2222
import io.grpc.ManagedChannelBuilder;
2323
import io.grpc.Status;
2424
import io.grpc.StatusRuntimeException;
25+
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
26+
import io.opentelemetry.context.Context;
27+
import io.opentelemetry.context.propagation.TextMapGetter;
2528

2629
import java.time.Duration;
2730
import java.util.HashMap;
2831
import java.util.Iterator;
32+
import java.util.Map;
2933
import java.util.concurrent.ExecutorService;
3034
import java.util.concurrent.Executors;
3135
import java.util.concurrent.TimeUnit;
@@ -85,7 +89,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
8589
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
8690
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
8791
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
88-
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
92+
93+
ExecutorService rawExecutor = builder.executorService != null
94+
? builder.executorService : Executors.newCachedThreadPool();
95+
this.workerPool = Context.taskWrapping(rawExecutor);
96+
8997
this.isExecutorServiceManaged = builder.executorService == null;
9098
}
9199

@@ -201,10 +209,16 @@ public void startAndBlock() {
201209
});
202210
} else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) {
203211
OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest();
204-
logger.log(Level.FINEST,
205-
String.format("Processing activity request: %s for instance: %s}",
212+
213+
214+
logger.log(Level.INFO,
215+
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
206216
activityRequest.getName(),
207-
activityRequest.getOrchestrationInstance().getInstanceId()));
217+
activityRequest.getOrchestrationInstance().getInstanceId(),
218+
Context.current()));
219+
220+
// Extract trace context from the ActivityRequest and set it as current
221+
Context traceContext = extractTraceContext(activityRequest);
208222

209223
// TODO: Error handling
210224
this.workerPool.submit(() -> {
@@ -215,7 +229,8 @@ public void startAndBlock() {
215229
activityRequest.getName(),
216230
activityRequest.getInput().getValue(),
217231
activityRequest.getTaskExecutionId(),
218-
activityRequest.getTaskId());
232+
activityRequest.getTaskId(),
233+
activityRequest.getParentTraceContext().getTraceParent());
219234
} catch (Throwable e) {
220235
failureDetails = TaskFailureDetails.newBuilder()
221236
.setErrorType(e.getClass().getName())
@@ -325,4 +340,57 @@ private void shutDownWorkerPool() {
325340
private String getSidecarAddress() {
326341
return this.sidecarClient.getChannel().authority();
327342
}
343+
344+
/**
345+
* Extracts trace context from the ActivityRequest's ParentTraceContext field
346+
* and creates an OpenTelemetry Context with the parent span set.
347+
*
348+
* @param activityRequest The activity request containing the parent trace context
349+
* @return A Context with the parent span set, or the current context if no trace context is present
350+
*/
351+
private Context extractTraceContext(OrchestratorService.ActivityRequest activityRequest) {
352+
if (!activityRequest.hasParentTraceContext()) {
353+
logger.log(Level.FINE, "No parent trace context in activity request");
354+
return Context.current();
355+
}
356+
357+
OrchestratorService.TraceContext traceContext = activityRequest.getParentTraceContext();
358+
String traceParent = traceContext.getTraceParent();
359+
360+
if (traceParent.isEmpty()) {
361+
logger.log(Level.FINE, "Empty traceparent in activity request");
362+
return Context.current();
363+
}
364+
365+
logger.log(Level.INFO,
366+
String.format("Extracting trace context from ActivityRequest: traceparent=%s", traceParent));
367+
368+
// Use W3CTraceContextPropagator to extract the trace context
369+
Map<String, String> carrier = new HashMap<>();
370+
carrier.put("traceparent", traceParent);
371+
if (traceContext.hasTraceState()) {
372+
carrier.put("tracestate", traceContext.getTraceState().getValue());
373+
}
374+
375+
TextMapGetter<Map<String, String>> getter = new TextMapGetter<Map<String, String>>() {
376+
@Override
377+
public Iterable<String> keys(Map<String, String> carrier) {
378+
return carrier.keySet();
379+
}
380+
381+
@Override
382+
public String get(Map<String, String> carrier, String key) {
383+
return carrier.get(key);
384+
}
385+
};
386+
387+
388+
Context extractedContext = W3CTraceContextPropagator.getInstance()
389+
.extract(Context.current(), carrier, getter);
390+
391+
logger.log(Level.INFO,
392+
String.format("Extracted trace context: %s", extractedContext));
393+
394+
return extractedContext;
395+
}
328396
}

durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,10 @@ public interface TaskActivityContext {
4848
* @return the task id of the current task activity
4949
*/
5050
int getTaskId();
51+
52+
/**
53+
* Gets the trace parent id for the current workflow execution.
54+
* @return trace parent id
55+
*/
56+
String getTraceParent();
5157
}

durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public TaskActivityExecutor(
3030
this.logger = logger;
3131
}
3232

33-
public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable {
33+
public String execute(String taskName, String input,
34+
String taskExecutionId, int taskId, String traceParent) throws Throwable {
3435
TaskActivityFactory factory = this.activityFactories.get(taskName);
3536
if (factory == null) {
3637
throw new IllegalStateException(
@@ -43,7 +44,8 @@ public String execute(String taskName, String input, String taskExecutionId, int
4344
String.format("The task factory '%s' returned a null TaskActivity object.", taskName));
4445
}
4546

46-
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId);
47+
TaskActivityContextImpl context = new TaskActivityContextImpl(
48+
taskName, input, taskExecutionId, taskId, traceParent);
4749

4850
// Unhandled exceptions are allowed to escape
4951
Object output = activity.run(context);
@@ -59,14 +61,17 @@ private class TaskActivityContextImpl implements TaskActivityContext {
5961
private final String rawInput;
6062
private final String taskExecutionId;
6163
private final int taskId;
64+
private final String traceParent;
6265

6366
private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter;
6467

65-
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) {
68+
public TaskActivityContextImpl(String activityName, String rawInput,
69+
String taskExecutionId, int taskId, String traceParent) {
6670
this.name = activityName;
6771
this.rawInput = rawInput;
6872
this.taskExecutionId = taskExecutionId;
6973
this.taskId = taskId;
74+
this.traceParent = traceParent;
7075
}
7176

7277
@Override
@@ -92,5 +97,10 @@ public String getTaskExecutionId() {
9297
public int getTaskId() {
9398
return this.taskId;
9499
}
100+
101+
@Override
102+
public String getTraceParent() {
103+
return this.traceParent;
104+
}
95105
}
96106
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1717
<grpc.version>1.69.0</grpc.version>
1818
<protobuf.version>3.25.5</protobuf.version>
19-
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.16.0-rc.5/dapr/proto</dapr.proto.baseurl>
19+
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/master/dapr/proto</dapr.proto.baseurl>
2020
<durabletask.proto.url>https://raw.githubusercontent.com/dapr/durabletask-protobuf/main/protos/orchestrator_service.proto</durabletask.proto.url>
2121
<dapr.sdk.version>1.17.0-SNAPSHOT</dapr.sdk.version>
2222
<os-maven-plugin.version>1.7.1</os-maven-plugin.version>

0 commit comments

Comments
 (0)