Skip to content

Commit a0e834e

Browse files
committed
Set thread service name for spark tasks
1 parent ba30c88 commit a0e834e

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
package datadog.trace.instrumentation.spark;
22

33
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
45
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
56
import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator;
7+
import java.lang.reflect.Field;
8+
import java.util.Properties;
69
import org.apache.spark.executor.Executor;
710
import org.apache.spark.executor.TaskMetrics;
811

912
public class SparkExecutorDecorator extends BaseDecorator {
1013
public static final CharSequence SPARK_TASK = UTF8BytesString.create("spark.task");
1114
public static final CharSequence SPARK = UTF8BytesString.create("spark");
1215
public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator();
16+
private final String propSparkAppName = "spark.app.name";
1317

1418
@Override
1519
protected String[] instrumentationNames() {
@@ -26,12 +30,29 @@ protected CharSequence component() {
2630
return SPARK;
2731
}
2832

29-
public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) {
33+
public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner, Object taskDescription) {
3034
span.setTag("task_id", taskRunner.taskId());
3135
span.setTag("task_thread_name", taskRunner.threadName());
36+
37+
if (taskDescription != null) {
38+
try {
39+
Field prop = taskDescription.getClass().getDeclaredField("properties");
40+
prop.setAccessible(true);
41+
Properties props = (Properties) prop.get(taskDescription);
42+
String appName = props.getProperty(propSparkAppName);
43+
if (appName != null) {
44+
AgentTracer.get()
45+
.getDataStreamsMonitoring()
46+
.setThreadServiceName(taskRunner.getThreadId(), appName);
47+
}
48+
} catch (Exception ignored) {
49+
}
50+
}
3251
}
3352

3453
public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) {
54+
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(taskRunner.getThreadId());
55+
3556
// task is set by spark in run() by deserializing the task binary coming from the driver
3657
if (taskRunner.task() == null) {
3758
return;
@@ -50,7 +71,7 @@ public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) {
5071
span.setTag("app_attempt_id", taskRunner.task().appAttemptId().get());
5172
}
5273
span.setTag(
53-
"application_name", taskRunner.task().localProperties().getProperty("spark.app.name"));
74+
"application_name", taskRunner.task().localProperties().getProperty(propSparkAppName));
5475

5576
TaskMetrics metrics = taskRunner.task().metrics();
5677
span.setMetric("spark.executor_deserialize_time", metrics.executorDeserializeTime());

dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorInstrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ public void methodAdvice(MethodTransformer transformer) {
5252

5353
public static final class RunAdvice {
5454
@Advice.OnMethodEnter(suppress = Throwable.class)
55-
public static AgentScope enter(@Advice.This Executor.TaskRunner taskRunner) {
55+
public static AgentScope enter(
56+
@Advice.FieldValue("taskDescription") final Object taskDescription,
57+
@Advice.This Executor.TaskRunner taskRunner) {
5658
final AgentSpan span = startSpan("spark-executor", SPARK_TASK);
5759

5860
DECORATE.afterStart(span);
59-
DECORATE.onTaskStart(span, taskRunner);
61+
DECORATE.onTaskStart(span, taskRunner, taskDescription);
6062

6163
return activateSpan(span);
6264
}

0 commit comments

Comments
 (0)