@@ -813,6 +813,16 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
813813 datasets .size (),
814814 sqlEnd .executionId ());
815815
816+ // update the dataset count in the application span
817+ Object datasetCount = applicationSpan .getTag ("spark.sql.dataset_count" );
818+ if (datasetCount != null ) {
819+ applicationSpan .setTag ("spark.sql.dataset_count" , (int ) datasetCount + datasets .size ());
820+ } else {
821+ applicationSpan .setTag ("spark.sql.dataset_count" , datasets .size ());
822+ }
823+
824+ long datasetIndex = datasetCount == null ? 0 : (int ) datasetCount ;
825+
816826 // iterate over the datasets with index
817827 for (int i = 0 ; i < datasets .size (); i ++) {
818828 SparkSQLUtils .LineageDataset dataset = datasets .get (i );
@@ -821,11 +831,22 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
821831 continue ;
822832 }
823833
834+ // add to SQL span
824835 span .setTag ("dataset." + i + ".name" , dataset .name );
825836 span .setTag ("dataset." + i + ".schema" , dataset .schema );
826837 span .setTag ("dataset." + i + ".stats" , dataset .stats );
827838 span .setTag ("dataset." + i + ".properties" , dataset .properties );
828839 span .setTag ("dataset." + i + ".type" , dataset .type );
840+
841+ // add to Application span
842+ applicationSpan .setTag ("spark.sql.dataset." + datasetIndex + ".name" , dataset .name );
843+ applicationSpan .setTag ("spark.sql.dataset." + datasetIndex + ".schema" , dataset .schema );
844+ applicationSpan .setTag ("spark.sql.dataset." + datasetIndex + ".stats" , dataset .stats );
845+ applicationSpan .setTag (
846+ "spark.sql.dataset." + datasetIndex + ".properties" , dataset .properties );
847+ applicationSpan .setTag ("spark.sql.dataset." + datasetIndex + ".type" , dataset .type );
848+
849+ datasetIndex ++;
829850 }
830851 }
831852
0 commit comments