Skip to content

Commit bb0e21a

Browse files
committed
adding otel context wrapper for executor
Signed-off-by: salaboy <[email protected]>
1 parent de8c665 commit bb0e21a

File tree

2 files changed

+113
-38
lines changed

2 files changed

+113
-38
lines changed

client/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
implementation 'com.google.protobuf:protobuf-java:3.25.5'
3030
implementation "io.micrometer:micrometer-observation:${micrometerVersion}"
3131
implementation "io.opentelemetry:opentelemetry-api:${otelVersion}"
32+
implementation "io.opentelemetry:opentelemetry-context:${otelVersion}"
3233
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"
3334

3435
compileOnly "org.apache.tomcat:annotations-api:6.0.53"

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 112 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@
99
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase;
1010
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
1111

12+
import io.opentelemetry.context.Context;
13+
import io.opentelemetry.api.trace.Span;
14+
import io.opentelemetry.api.trace.SpanContext;
15+
import io.opentelemetry.api.trace.TraceFlags;
16+
import io.opentelemetry.api.trace.TraceState;
17+
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
18+
import io.opentelemetry.context.propagation.TextMapGetter;
19+
1220
import io.grpc.*;
1321

1422
import java.time.Duration;
@@ -72,7 +80,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
7280
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
7381
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
7482
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
75-
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
83+
84+
// Create the executor and wrap it to propagate OpenTelemetry context across threads
85+
ExecutorService rawExecutor = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
86+
this.workerPool = Context.taskWrapping(rawExecutor);
87+
7688
this.isExecutorServiceManaged = builder.executorService == null;
7789
}
7890

@@ -186,59 +198,69 @@ public void startAndBlock() {
186198
});
187199
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
188200
ActivityRequest activityRequest = workItem.getActivityRequest();
189-
logger.log(Level.FINEST,
190-
String.format("Processing activity request: %s for instance: %s}",
191-
activityRequest.getName(),
192-
activityRequest.getOrchestrationInstance().getInstanceId()));
201+
logger.log(Level.INFO,
202+
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
203+
activityRequest.getName(),
204+
activityRequest.getOrchestrationInstance().getInstanceId(),
205+
Context.current()));
193206

207+
// Extract trace context from the ActivityRequest and set it as current
208+
Context traceContext = extractTraceContext(activityRequest);
209+
210+
// The workerPool is wrapped with Context.taskWrapping() so the trace context
211+
// will be automatically propagated to the worker thread
194212
// TODO: Error handling
195213
this.workerPool.submit(() -> {
214+
// Make the extracted trace context current so the OTel Java agent can instrument HTTP calls
215+
try (io.opentelemetry.context.Scope scope = traceContext.makeCurrent()) {
216+
logger.log(Level.INFO,
217+
String.format("Executing activity: %s in worker thread with trace context: %s",
218+
activityRequest.getName(),
219+
Context.current()));
196220
String output = null;
197221
TaskFailureDetails failureDetails = null;
198222
try {
199-
output = taskActivityExecutor.execute(
200-
activityRequest.getName(),
201-
activityRequest.getInput().getValue(),
202-
activityRequest.getTaskExecutionId(),
203-
activityRequest.getParentTraceContext().getTraceParent(),
204-
activityRequest.getTaskId());
223+
// Execute the activity - HTTP calls will now be instrumented with the correct trace context
224+
output = taskActivityExecutor.execute(
225+
activityRequest.getName(),
226+
activityRequest.getInput().getValue(),
227+
activityRequest.getTaskExecutionId(),
228+
activityRequest.getParentTraceContext().getTraceParent(),
229+
activityRequest.getTaskId());
205230
} catch (Throwable e) {
206-
failureDetails = TaskFailureDetails.newBuilder()
207-
.setErrorType(e.getClass().getName())
208-
.setErrorMessage(e.getMessage())
209-
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
210-
.build();
231+
failureDetails = TaskFailureDetails.newBuilder()
232+
.setErrorType(e.getClass().getName())
233+
.setErrorMessage(e.getMessage())
234+
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
235+
.build();
211236
}
212-
213237
ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
214-
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
215-
.setTaskId(activityRequest.getTaskId())
216-
.setCompletionToken(workItem.getCompletionToken());
217-
238+
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
239+
.setTaskId(activityRequest.getTaskId())
240+
.setCompletionToken(workItem.getCompletionToken());
218241
if (output != null) {
219-
responseBuilder.setResult(StringValue.of(output));
242+
responseBuilder.setResult(StringValue.of(output));
220243
}
221-
222244
if (failureDetails != null) {
223-
responseBuilder.setFailureDetails(failureDetails);
245+
responseBuilder.setFailureDetails(failureDetails);
224246
}
225-
226247
try {
227-
this.sidecarClient.completeActivityTask(responseBuilder.build());
248+
this.sidecarClient.completeActivityTask(responseBuilder.build());
228249
} catch (StatusRuntimeException e) {
229-
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
230-
logger.log(Level.WARNING,
231-
"The sidecar at address {0} is unavailable while completing the activity task.",
232-
this.getSidecarAddress());
233-
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
234-
logger.log(Level.WARNING,
235-
"Durable Task worker has disconnected from {0} while completing the activity task.",
236-
this.getSidecarAddress());
237-
} else {
238-
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
239-
this.getSidecarAddress());
240-
}
250+
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
251+
logger.log(Level.WARNING,
252+
"The sidecar at address {0} is unavailable while completing the activity task.",
253+
this.getSidecarAddress());
254+
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
255+
logger.log(Level.WARNING,
256+
"Durable Task worker has disconnected from {0} while completing the activity task.",
257+
this.getSidecarAddress());
258+
} else {
259+
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
260+
this.getSidecarAddress());
261+
}
241262
}
263+
}// end try-with-resources scope
242264
});
243265
} else if (requestType == RequestCase.HEALTHPING) {
244266
// No-op
@@ -310,4 +332,56 @@ private void shutDownWorkerPool() {
310332
private String getSidecarAddress() {
311333
return this.sidecarClient.getChannel().authority();
312334
}
335+
336+
/**
337+
* Extracts trace context from the ActivityRequest's ParentTraceContext field
338+
* and creates an OpenTelemetry Context with the parent span set.
339+
*
340+
* @param activityRequest The activity request containing the parent trace context
341+
* @return A Context with the parent span set, or the current context if no trace context is present
342+
*/
343+
private Context extractTraceContext(ActivityRequest activityRequest) {
344+
if (!activityRequest.hasParentTraceContext()) {
345+
logger.log(Level.FINE, "No parent trace context in activity request");
346+
return Context.current();
347+
}
348+
349+
TraceContext traceContext = activityRequest.getParentTraceContext();
350+
String traceparent = traceContext.getTraceParent();
351+
352+
if (traceparent == null || traceparent.isEmpty()) {
353+
logger.log(Level.FINE, "Empty traceparent in activity request");
354+
return Context.current();
355+
}
356+
357+
logger.log(Level.INFO,
358+
String.format("Extracting trace context from ActivityRequest: traceparent=%s", traceparent));
359+
360+
// Use W3CTraceContextPropagator to extract the trace context
361+
Map<String, String> carrier = new HashMap<>();
362+
carrier.put("traceparent", traceparent);
363+
if (traceContext.hasTraceState()) {
364+
carrier.put("tracestate", traceContext.getTraceState().getValue());
365+
}
366+
367+
TextMapGetter<Map<String, String>> getter = new TextMapGetter<Map<String, String>>() {
368+
@Override
369+
public Iterable<String> keys(Map<String, String> carrier) {
370+
return carrier.keySet();
371+
}
372+
373+
@Override
374+
public String get(Map<String, String> carrier, String key) {
375+
return carrier.get(key);
376+
}
377+
};
378+
379+
Context extractedContext = W3CTraceContextPropagator.getInstance()
380+
.extract(Context.current(), carrier, getter);
381+
382+
logger.log(Level.INFO,
383+
String.format("Extracted trace context: %s", extractedContext));
384+
385+
return extractedContext;
386+
}
313387
}

0 commit comments

Comments
 (0)