44import datadog .trace .bootstrap .instrumentation .api .AgentTracer ;
55import datadog .trace .bootstrap .instrumentation .api .UTF8BytesString ;
66import datadog .trace .bootstrap .instrumentation .decorator .BaseDecorator ;
7+ import java .lang .invoke .MethodHandle ;
8+ import java .lang .invoke .MethodHandles ;
79import java .lang .reflect .Field ;
810import java .util .Properties ;
911import org .apache .spark .executor .Executor ;
1012import org .apache .spark .executor .TaskMetrics ;
13+ import org .slf4j .Logger ;
14+ import org .slf4j .LoggerFactory ;
1115
1216public class SparkExecutorDecorator extends BaseDecorator {
17+ private static final Logger log = LoggerFactory .getLogger (SparkExecutorDecorator .class );
18+
1319 public static final CharSequence SPARK_TASK = UTF8BytesString .create ("spark.task" );
1420 public static final CharSequence SPARK = UTF8BytesString .create ("spark" );
1521 public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator ();
1622 private final String propSparkAppName = "spark.app.name" ;
23+ private static final String TASK_DESCRIPTION_CLASSNAME =
24+ "org.apache.spark.scheduler.TaskDescription" ;
25+ private static final MethodHandles .Lookup lookup = MethodHandles .lookup ();
26+ private static final MethodHandle propertiesField_mh ;
27+
28+ private static Class <?> initClass () {
29+ try {
30+ return Class .forName (
31+ SparkExecutorDecorator .TASK_DESCRIPTION_CLASSNAME ,
32+ false ,
33+ SparkExecutorDecorator .class .getClassLoader ());
34+ } catch (ClassNotFoundException e ) {
35+ log .debug ("Can't find class '{}'" , TASK_DESCRIPTION_CLASSNAME , e );
36+ }
37+ return null ;
38+ }
39+
40+ private static MethodHandle getFieldGetter () {
41+ Class <?> cls = initClass ();
42+
43+ try {
44+ if (cls != null ) {
45+ Field field = cls .getDeclaredField ("properties" );
46+ field .setAccessible (true );
47+
48+ return lookup .unreflectGetter (field );
49+ }
50+
51+ } catch (NoSuchFieldException | IllegalAccessException e ) {
52+ log .debug ("Can't find and unreflect declared field for '{}'" , TASK_DESCRIPTION_CLASSNAME );
53+ }
54+
55+ return null ;
56+ }
57+
58+ static {
59+ propertiesField_mh = getFieldGetter ();
60+ }
1761
1862 @ Override
1963 protected String [] instrumentationNames () {
@@ -34,18 +78,18 @@ public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner, Object t
3478 span .setTag ("task_id" , taskRunner .taskId ());
3579 span .setTag ("task_thread_name" , taskRunner .threadName ());
3680
37- if (taskDescription != null ) {
81+ if (taskDescription != null && propertiesField_mh != null ) {
3882 try {
39- Field prop = taskDescription . getClass (). getDeclaredField ( "properties" );
40- prop . setAccessible ( true );
41- Properties props = ( Properties ) prop . get ( taskDescription );
42- String appName = props . getProperty ( propSparkAppName );
43- if ( appName != null ) {
44- AgentTracer . get ()
45- . getDataStreamsMonitoring ()
46- . setThreadServiceName ( taskRunner . getThreadId (), appName );
83+ Properties props = ( Properties ) propertiesField_mh . invoke ( taskDescription );
84+ if ( props != null ) {
85+ String appName = props . getProperty ( propSparkAppName );
86+ if ( appName != null ) {
87+ AgentTracer . get ()
88+ . getDataStreamsMonitoring ()
89+ . setThreadServiceName ( taskRunner . getThreadId (), appName );
90+ }
4791 }
48- } catch (Exception ignored ) {
92+ } catch (Throwable ignored ) {
4993 }
5094 }
5195 }
0 commit comments