|
9 | 9 | import io.dapr.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase; |
10 | 10 | import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; |
11 | 11 |
|
| 12 | +import io.opentelemetry.context.Context; |
| 13 | +import io.opentelemetry.api.trace.Span; |
| 14 | +import io.opentelemetry.api.trace.SpanContext; |
| 15 | +import io.opentelemetry.api.trace.TraceFlags; |
| 16 | +import io.opentelemetry.api.trace.TraceState; |
| 17 | +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; |
| 18 | +import io.opentelemetry.context.propagation.TextMapGetter; |
| 19 | + |
12 | 20 | import io.grpc.*; |
13 | 21 |
|
14 | 22 | import java.time.Duration; |
@@ -72,7 +80,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { |
72 | 80 | this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); |
73 | 81 | this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval |
74 | 82 | : DEFAULT_MAXIMUM_TIMER_INTERVAL; |
75 | | - this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); |
| 83 | + |
| 84 | + // Create the executor and wrap it to propagate OpenTelemetry context across threads |
| 85 | + ExecutorService rawExecutor = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); |
| 86 | + this.workerPool = Context.taskWrapping(rawExecutor); |
| 87 | + |
76 | 88 | this.isExecutorServiceManaged = builder.executorService == null; |
77 | 89 | } |
78 | 90 |
|
@@ -186,59 +198,69 @@ public void startAndBlock() { |
186 | 198 | }); |
187 | 199 | } else if (requestType == RequestCase.ACTIVITYREQUEST) { |
188 | 200 | ActivityRequest activityRequest = workItem.getActivityRequest(); |
189 | | - logger.log(Level.FINEST, |
190 | | - String.format("Processing activity request: %s for instance: %s}", |
191 | | - activityRequest.getName(), |
192 | | - activityRequest.getOrchestrationInstance().getInstanceId())); |
| 201 | + logger.log(Level.INFO, |
| 202 | + String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", |
| 203 | + activityRequest.getName(), |
| 204 | + activityRequest.getOrchestrationInstance().getInstanceId(), |
| 205 | + Context.current())); |
193 | 206 |
|
| 207 | + // Extract trace context from the ActivityRequest and set it as current |
| 208 | + Context traceContext = extractTraceContext(activityRequest); |
| 209 | + |
| 210 | + // The workerPool is wrapped with Context.taskWrapping() so the trace context |
| 211 | + // will be automatically propagated to the worker thread |
194 | 212 | // TODO: Error handling |
195 | 213 | this.workerPool.submit(() -> { |
| 214 | + // Make the extracted trace context current so the OTel Java agent can instrument HTTP calls |
| 215 | + try (io.opentelemetry.context.Scope scope = traceContext.makeCurrent()) { |
| 216 | + logger.log(Level.INFO, |
| 217 | + String.format("Executing activity: %s in worker thread with trace context: %s", |
| 218 | + activityRequest.getName(), |
| 219 | + Context.current())); |
196 | 220 | String output = null; |
197 | 221 | TaskFailureDetails failureDetails = null; |
198 | 222 | try { |
199 | | - output = taskActivityExecutor.execute( |
200 | | - activityRequest.getName(), |
201 | | - activityRequest.getInput().getValue(), |
202 | | - activityRequest.getTaskExecutionId(), |
203 | | - activityRequest.getParentTraceContext().getTraceParent(), |
204 | | - activityRequest.getTaskId()); |
| 223 | + // Execute the activity - HTTP calls will now be instrumented with the correct trace context |
| 224 | + output = taskActivityExecutor.execute( |
| 225 | + activityRequest.getName(), |
| 226 | + activityRequest.getInput().getValue(), |
| 227 | + activityRequest.getTaskExecutionId(), |
| 228 | + activityRequest.getParentTraceContext().getTraceParent(), |
| 229 | + activityRequest.getTaskId()); |
205 | 230 | } catch (Throwable e) { |
206 | | - failureDetails = TaskFailureDetails.newBuilder() |
207 | | - .setErrorType(e.getClass().getName()) |
208 | | - .setErrorMessage(e.getMessage()) |
209 | | - .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) |
210 | | - .build(); |
| 231 | + failureDetails = TaskFailureDetails.newBuilder() |
| 232 | + .setErrorType(e.getClass().getName()) |
| 233 | + .setErrorMessage(e.getMessage()) |
| 234 | + .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) |
| 235 | + .build(); |
211 | 236 | } |
212 | | - |
213 | 237 | ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder() |
214 | | - .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) |
215 | | - .setTaskId(activityRequest.getTaskId()) |
216 | | - .setCompletionToken(workItem.getCompletionToken()); |
217 | | - |
| 238 | + .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) |
| 239 | + .setTaskId(activityRequest.getTaskId()) |
| 240 | + .setCompletionToken(workItem.getCompletionToken()); |
218 | 241 | if (output != null) { |
219 | | - responseBuilder.setResult(StringValue.of(output)); |
| 242 | + responseBuilder.setResult(StringValue.of(output)); |
220 | 243 | } |
221 | | - |
222 | 244 | if (failureDetails != null) { |
223 | | - responseBuilder.setFailureDetails(failureDetails); |
| 245 | + responseBuilder.setFailureDetails(failureDetails); |
224 | 246 | } |
225 | | - |
226 | 247 | try { |
227 | | - this.sidecarClient.completeActivityTask(responseBuilder.build()); |
| 248 | + this.sidecarClient.completeActivityTask(responseBuilder.build()); |
228 | 249 | } catch (StatusRuntimeException e) { |
229 | | - if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { |
230 | | - logger.log(Level.WARNING, |
231 | | - "The sidecar at address {0} is unavailable while completing the activity task.", |
232 | | - this.getSidecarAddress()); |
233 | | - } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { |
234 | | - logger.log(Level.WARNING, |
235 | | - "Durable Task worker has disconnected from {0} while completing the activity task.", |
236 | | - this.getSidecarAddress()); |
237 | | - } else { |
238 | | - logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", |
239 | | - this.getSidecarAddress()); |
240 | | - } |
| 250 | + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { |
| 251 | + logger.log(Level.WARNING, |
| 252 | + "The sidecar at address {0} is unavailable while completing the activity task.", |
| 253 | + this.getSidecarAddress()); |
| 254 | + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { |
| 255 | + logger.log(Level.WARNING, |
| 256 | + "Durable Task worker has disconnected from {0} while completing the activity task.", |
| 257 | + this.getSidecarAddress()); |
| 258 | + } else { |
| 259 | + logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", |
| 260 | + this.getSidecarAddress()); |
| 261 | + } |
241 | 262 | } |
| 263 | + }// end try-with-resources scope |
242 | 264 | }); |
243 | 265 | } else if (requestType == RequestCase.HEALTHPING) { |
244 | 266 | // No-op |
@@ -310,4 +332,56 @@ private void shutDownWorkerPool() { |
310 | 332 | private String getSidecarAddress() { |
311 | 333 | return this.sidecarClient.getChannel().authority(); |
312 | 334 | } |
| 335 | + |
| 336 | + /** |
| 337 | + * Extracts trace context from the ActivityRequest's ParentTraceContext field |
| 338 | + * and creates an OpenTelemetry Context with the parent span set. |
| 339 | + * |
| 340 | + * @param activityRequest The activity request containing the parent trace context |
| 341 | + * @return A Context with the parent span set, or the current context if no trace context is present |
| 342 | + */ |
| 343 | + private Context extractTraceContext(ActivityRequest activityRequest) { |
| 344 | + if (!activityRequest.hasParentTraceContext()) { |
| 345 | + logger.log(Level.FINE, "No parent trace context in activity request"); |
| 346 | + return Context.current(); |
| 347 | + } |
| 348 | + |
| 349 | + TraceContext traceContext = activityRequest.getParentTraceContext(); |
| 350 | + String traceparent = traceContext.getTraceParent(); |
| 351 | + |
| 352 | + if (traceparent == null || traceparent.isEmpty()) { |
| 353 | + logger.log(Level.FINE, "Empty traceparent in activity request"); |
| 354 | + return Context.current(); |
| 355 | + } |
| 356 | + |
| 357 | + logger.log(Level.INFO, |
| 358 | + String.format("Extracting trace context from ActivityRequest: traceparent=%s", traceparent)); |
| 359 | + |
| 360 | + // Use W3CTraceContextPropagator to extract the trace context |
| 361 | + Map<String, String> carrier = new HashMap<>(); |
| 362 | + carrier.put("traceparent", traceparent); |
| 363 | + if (traceContext.hasTraceState()) { |
| 364 | + carrier.put("tracestate", traceContext.getTraceState().getValue()); |
| 365 | + } |
| 366 | + |
| 367 | + TextMapGetter<Map<String, String>> getter = new TextMapGetter<Map<String, String>>() { |
| 368 | + @Override |
| 369 | + public Iterable<String> keys(Map<String, String> carrier) { |
| 370 | + return carrier.keySet(); |
| 371 | + } |
| 372 | + |
| 373 | + @Override |
| 374 | + public String get(Map<String, String> carrier, String key) { |
| 375 | + return carrier.get(key); |
| 376 | + } |
| 377 | + }; |
| 378 | + |
| 379 | + Context extractedContext = W3CTraceContextPropagator.getInstance() |
| 380 | + .extract(Context.current(), carrier, getter); |
| 381 | + |
| 382 | + logger.log(Level.INFO, |
| 383 | + String.format("Extracted trace context: %s", extractedContext)); |
| 384 | + |
| 385 | + return extractedContext; |
| 386 | + } |
313 | 387 | } |
0 commit comments