2525import java .lang .invoke .MethodType ;
2626import java .time .OffsetDateTime ;
2727import java .time .format .DateTimeParseException ;
28- import java .util .ArrayList ;
29- import java .util .Arrays ;
30- import java .util .Collection ;
31- import java .util .HashMap ;
32- import java .util .Iterator ;
33- import java .util .LinkedHashMap ;
34- import java .util .List ;
35- import java .util .Map ;
36- import java .util .Optional ;
37- import java .util .Properties ;
38- import java .util .UUID ;
28+ import java .util .*;
29+ import java .util .stream .Collectors ;
3930import org .apache .spark .ExceptionFailure ;
4031import org .apache .spark .SparkConf ;
4132import org .apache .spark .TaskFailedReason ;
5647import scala .Tuple2 ;
5748import scala .collection .JavaConverters ;
5849
50+ // import io.openlineage.spark.agent.OpenLineageSparkListener;
51+
5952/**
6053 * Implementation of the SparkListener {@link SparkListener} to generate spans from the execution of
6154 * a spark application.
6558 * still needed
6659 */
6760public abstract class AbstractDatadogSparkListener extends SparkListener {
68- private static final Logger log = LoggerFactory .getLogger (AbstractDatadogSparkListener .class );
61+ protected static final Logger log = LoggerFactory .getLogger (AbstractDatadogSparkListener .class );
6962 private static final ObjectMapper objectMapper = new ObjectMapper ();
7063 public static volatile AbstractDatadogSparkListener listener = null ;
7164 public static volatile boolean finishTraceOnApplicationEnd = true ;
@@ -123,8 +116,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
123116 private long availableExecutorTime = 0 ;
124117
125118 private volatile boolean applicationEnded = false ;
119+ private SparkListener openLineageSparkListener = null ;
126120
127121 public AbstractDatadogSparkListener (SparkConf sparkConf , String appId , String sparkVersion ) {
122+ log .error ("STARTING DD SPARK LISTENER" );
128123 tracer = AgentTracer .get ();
129124
130125 this .sparkConf = sparkConf ;
@@ -151,8 +146,72 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
151146 finishApplication (System .currentTimeMillis (), null , 0 , null );
152147 }
153148 }));
149+ log .error ("Created datadog spark listener: {}" , this .getClass ().getSimpleName ());
150+ loadOlSparkListener ();
151+ }
154152
155- log .info ("Created datadog spark listener: {}" , this .getClass ().getSimpleName ());
153+ void loadOlSparkListener () {
154+ List <ClassLoader > availableClassloaders =
155+ Thread .getAllStackTraces ().keySet ().stream ()
156+ .map (Thread ::getContextClassLoader )
157+ .filter (Objects ::nonNull )
158+ .collect (Collectors .toList ());
159+ String className = "io.openlineage.spark.agent.OpenLineageSparkListener" ;
160+ Class clazz = null ;
161+ try {
162+ clazz = Class .forName (className );
163+ } catch (Exception e ) {
164+ log .error ("Failed to load OL Spark Listener via Class.forName: {}" , e .toString ());
165+ for (ClassLoader classLoader : availableClassloaders ) {
166+ try {
167+ clazz = classLoader .loadClass (className );
168+ log .error ("Loaded Spark Listener via classLoader: {}" , classLoader );
169+ break ;
170+ } catch (Exception ex ) {
171+ log .error (
172+ "Failed to load OL Spark Listener via loadClass via ClassLoader {} - {}" ,
173+ classLoader ,
174+ ex .toString ());
175+ }
176+ try {
177+ clazz = classLoader .getParent ().loadClass (className );
178+ log .error (
179+ "Loaded Spark Listener via parent classLoader: {} for CL {}" ,
180+ classLoader .getParent (),
181+ classLoader );
182+ break ;
183+ } catch (Exception ex ) {
184+ log .error (
185+ "Failed to load OL Spark Listener via loadClass via parent ClassLoader {} - {}" ,
186+ classLoader .getParent (),
187+ ex .toString ());
188+ }
189+ }
190+ }
191+ if (clazz == null ) {
192+ try {
193+ clazz = ClassLoader .getSystemClassLoader ().loadClass (className );
194+ log .error (
195+ "Loaded Spark Listener via system classLoader: {}" , ClassLoader .getSystemClassLoader ());
196+ } catch (Exception ex ) {
197+ log .error (
198+ "Failed to load OL Spark Listener via loadClass via SystemClassLoader {}" ,
199+ ex .toString ());
200+ }
201+ }
202+ if (clazz == null ) {
203+ return ;
204+ }
205+ try {
206+ sparkConf .set ("spark.openlineage.transport" , "true" );
207+ openLineageSparkListener =
208+ (SparkListener ) clazz .getDeclaredConstructor (SparkConf .class ).newInstance (sparkConf );
209+ ;
210+ log .error (
211+ "Created OL spark listener: {}" , openLineageSparkListener .getClass ().getSimpleName ());
212+ } catch (Exception e ) {
213+ log .error ("Failed to instantiate OL Spark Listener: {}" , e .toString ());
214+ }
156215 }
157216
158217 /** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -176,6 +235,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
176235 @ Override
177236 public synchronized void onApplicationStart (SparkListenerApplicationStart applicationStart ) {
178237 this .applicationStart = applicationStart ;
238+ if (this .openLineageSparkListener != null ) {
239+ this .openLineageSparkListener .onApplicationStart (applicationStart );
240+ }
179241 }
180242
181243 private void initApplicationSpanIfNotInitialized () {
@@ -237,51 +299,57 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
237299 if (finishTraceOnApplicationEnd ) {
238300 finishApplication (applicationEnd .time (), null , 0 , null );
239301 }
302+ if (this .openLineageSparkListener != null ) {
303+ this .openLineageSparkListener .onApplicationEnd (applicationEnd );
304+ }
240305 }
241306
242307 public synchronized void finishApplication (
243308 long time , Throwable throwable , int exitCode , String msg ) {
244309 log .info ("Finishing spark application trace" );
310+ return ;
245311
246- if (applicationEnded ) {
247- return ;
248- }
249- applicationEnded = true ;
250-
251- if (applicationSpan == null && jobCount > 0 ) {
252- // If the application span is not initialized, but spark jobs have been executed, all those
253- // spark jobs were databricks or streaming. In this case we don't send the application span
254- return ;
255- }
256- initApplicationSpanIfNotInitialized ();
257-
258- if (throwable != null ) {
259- applicationSpan .addThrowable (throwable );
260- } else if (exitCode != 0 ) {
261- applicationSpan .setError (true );
262- applicationSpan .setTag (
263- DDTags .ERROR_TYPE , "Spark Application Failed with exit code " + exitCode );
264-
265- String errorMessage = getErrorMessageWithoutStackTrace (msg );
266- applicationSpan .setTag (DDTags .ERROR_MSG , errorMessage );
267- applicationSpan .setTag (DDTags .ERROR_STACK , msg );
268- } else if (lastJobFailed ) {
269- applicationSpan .setError (true );
270- applicationSpan .setTag (DDTags .ERROR_TYPE , "Spark Application Failed" );
271- applicationSpan .setTag (DDTags .ERROR_MSG , lastJobFailedMessage );
272- applicationSpan .setTag (DDTags .ERROR_STACK , lastJobFailedStackTrace );
273- }
274-
275- applicationMetrics .setSpanMetrics (applicationSpan );
276- applicationSpan .setMetric ("spark.max_executor_count" , maxExecutorCount );
277- applicationSpan .setMetric (
278- "spark.available_executor_time" , computeCurrentAvailableExecutorTime (time ));
279-
280- applicationSpan .finish (time * 1000 );
281-
282- // write traces synchronously:
283- // as soon as the application finishes, the JVM starts to shut down
284- tracer .flush ();
312+ // if (applicationEnded) {
313+ // return;
314+ // }
315+ // applicationEnded = true;
316+ //
317+ // if (applicationSpan == null && jobCount > 0) {
318+ // // If the application span is not initialized, but spark jobs have been executed, all
319+ // those
320+ // // spark jobs were databricks or streaming. In this case we don't send the application
321+ // span
322+ // return;
323+ // }
324+ // initApplicationSpanIfNotInitialized();
325+ //
326+ // if (throwable != null) {
327+ // applicationSpan.addThrowable(throwable);
328+ // } else if (exitCode != 0) {
329+ // applicationSpan.setError(true);
330+ // applicationSpan.setTag(
331+ // DDTags.ERROR_TYPE, "Spark Application Failed with exit code " + exitCode);
332+ //
333+ // String errorMessage = getErrorMessageWithoutStackTrace(msg);
334+ // applicationSpan.setTag(DDTags.ERROR_MSG, errorMessage);
335+ // applicationSpan.setTag(DDTags.ERROR_STACK, msg);
336+ // } else if (lastJobFailed) {
337+ // applicationSpan.setError(true);
338+ // applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed");
339+ // applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage);
340+ // applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace);
341+ // }
342+ //
343+ // applicationMetrics.setSpanMetrics(applicationSpan);
344+ // applicationSpan.setMetric("spark.max_executor_count", maxExecutorCount);
345+ // applicationSpan.setMetric(
346+ // "spark.available_executor_time", computeCurrentAvailableExecutorTime(time));
347+ //
348+ // applicationSpan.finish(time * 1000);
349+ //
350+ // // write traces synchronously:
351+ // // as soon as the application finishes, the JVM starts to shut down
352+ // tracer.flush();
285353 }
286354
287355 private AgentSpan getOrCreateStreamingBatchSpan (
@@ -426,6 +494,9 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
426494 stageToJob .put (stageId , jobStart .jobId ());
427495 }
428496 jobSpans .put (jobStart .jobId (), jobSpan );
497+ if (this .openLineageSparkListener != null ) {
498+ this .openLineageSparkListener .onJobStart (jobStart );
499+ }
429500 }
430501
431502 @ Override
@@ -458,6 +529,9 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
458529 }
459530
460531 jobSpan .finish (jobEnd .time () * 1000 );
532+ if (this .openLineageSparkListener != null ) {
533+ this .openLineageSparkListener .onJobEnd (jobEnd );
534+ }
461535 }
462536
463537 @ Override
@@ -624,6 +698,10 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
624698
625699 Properties props = stageProperties .get (stageSpanKey );
626700 sendTaskSpan (stageSpan , taskEnd , props );
701+
702+ if (this .openLineageSparkListener != null ) {
703+ this .openLineageSparkListener .onTaskEnd (taskEnd );
704+ }
627705 }
628706
629707 private void sendTaskSpan (
@@ -702,6 +780,10 @@ public void onOtherEvent(SparkListenerEvent event) {
702780 onSQLExecutionEnd ((SparkListenerSQLExecutionEnd ) event );
703781 }
704782
783+ if (this .openLineageSparkListener != null ) {
784+ this .openLineageSparkListener .onOtherEvent (event );
785+ }
786+
705787 updateAdaptiveSQLPlan (event );
706788 }
707789
0 commit comments