Skip to content

Commit 9292a6c

Browse files
committed
a spark instrumentation poc to collect table dataset run by spark sql
1 parent aa7092b commit 9292a6c

File tree

6 files changed

+163
-12
lines changed

6 files changed

+163
-12
lines changed

dd-java-agent/instrumentation/spark/build.gradle

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ configurations.all {
77
resolutionStrategy.deactivateDependencyLocking()
88
}
99
dependencies {
10-
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
11-
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
10+
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '3.5.1'
11+
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '3.5.1'
1212

1313
testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
1414
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
@@ -17,7 +17,7 @@ dependencies {
1717
testFixturesApi project(':dd-java-agent:instrumentation:trace-annotation')
1818
testFixturesApi project(':dd-java-agent:testing')
1919

20-
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
21-
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
22-
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
20+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '3.5.1'
21+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '3.5.1'
22+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '3.5.1'
2323
}

dd-java-agent/instrumentation/spark/spark_2.12/build.gradle

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ plugins {
22
id 'java-test-fixtures'
33
}
44

5-
def sparkVersion = '2.4.0'
5+
def sparkVersion = '3.5.1'
66
def scalaVersion = '2.12'
77

88
muzzle {
@@ -41,13 +41,13 @@ dependencies {
4141
testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
4242
testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion"
4343

44-
test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.8"
45-
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.8"
46-
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.8"
44+
test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.5.1"
45+
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.5.1"
46+
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.5.1"
4747

48-
test_spark32Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.2.4"
49-
test_spark32Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.2.4"
50-
test_spark32Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.2.4"
48+
test_spark32Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.5.1"
49+
test_spark32Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.5.1"
50+
test_spark32Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.5.1"
5151
// We do not support netty versions older than this because of a change to the number of parameters to the
5252
// PooledByteBufAllocator constructor. See this PR where the new constructor (the only one we support) was introduced:
5353
// https://github.com/netty/netty/pull/10267

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ public String[] helperClassNames() {
2323
packageName + ".SparkSQLUtils",
2424
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
2525
packageName + ".SparkSQLUtils$AccumulatorWithStage",
26+
packageName + ".SparkSQLUtils$LineageDataset",
27+
packageName + ".SparkSQLUtils$1",
28+
packageName + ".SparkSQLUtils$2",
2629
};
2730
}
2831

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ public String[] helperClassNames() {
2323
packageName + ".SparkSQLUtils",
2424
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
2525
packageName + ".SparkSQLUtils$AccumulatorWithStage",
26+
packageName + ".SparkSQLUtils$LineageDataset",
27+
packageName + ".SparkSQLUtils$1",
28+
packageName + ".SparkSQLUtils$2",
2629
};
2730
}
2831

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.spark.SparkConf;
4040
import org.apache.spark.TaskFailedReason;
4141
import org.apache.spark.scheduler.*;
42+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
43+
import org.apache.spark.sql.execution.QueryExecution;
4244
import org.apache.spark.sql.execution.SQLExecution;
4345
import org.apache.spark.sql.execution.SparkPlanInfo;
4446
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
@@ -102,6 +104,8 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
102104
protected final HashMap<Long, SparkPlanInfo> sqlPlans = new HashMap<>();
103105
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();
104106

107+
private final HashMap<Long, List<SparkSQLUtils.LineageDataset>> lineageDatasets = new HashMap<>();
108+
105109
// There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of
106110
// an active SQL query)
107111
// so capping the size of the collection storing them
@@ -752,11 +756,49 @@ private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) {
752756
private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sqlStart) {
753757
sqlPlans.put(sqlStart.executionId(), sqlStart.sparkPlanInfo());
754758
sqlQueries.put(sqlStart.executionId(), sqlStart);
759+
760+
long sqlExecutionId = sqlStart.executionId();
761+
QueryExecution queryExecution = SQLExecution.getQueryExecution(sqlExecutionId);
762+
if (queryExecution != null) {
763+
LogicalPlan logicalPlan = queryExecution.analyzed();
764+
765+
log.info("Logical plan for query execution id {}: {}", sqlExecutionId, logicalPlan);
766+
767+
if (logicalPlan != null) {
768+
// Collection<DataSourceV2Relation> relations =
769+
// JavaConverters.asJavaCollection(logicalPlan.collect(SparkSQLUtils.pf));
770+
// List<SparkSQLUtils.LineageDataset> datasets = new ArrayList<>();
771+
//
772+
// for (DataSourceV2Relation relation : relations) {
773+
// String name = relation.table().name();
774+
// String schema = relation.schema().json();
775+
// String stats = relation.stats().toString();
776+
// String properties = relation.table().properties().toString();
777+
//
778+
// datasets.add(new SparkSQLUtils.LineageDataset(name, schema, stats, properties));
779+
// }
780+
781+
List<SparkSQLUtils.LineageDataset> datasets =
782+
JavaConverters.seqAsJavaList(logicalPlan.collect(SparkSQLUtils.logicalPlanToDataset));
783+
if (!datasets.isEmpty()) {
784+
lineageDatasets.put(sqlExecutionId, datasets);
785+
}
786+
787+
// if (relations.isEmpty()) {
788+
// log.info("No DataSourceV2Relation found for query execution id {}",
789+
// sqlExecutionId);
790+
// }
791+
}
792+
} else {
793+
log.warn("Start: QueryExecution not found for sqlEnd queryExecutionId: {}", sqlExecutionId);
794+
}
755795
}
756796

757797
private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
758798
AgentSpan span = sqlSpans.remove(sqlEnd.executionId());
759799
SparkAggregatedTaskMetrics metrics = sqlMetrics.remove(sqlEnd.executionId());
800+
List<SparkSQLUtils.LineageDataset> datasets = lineageDatasets.remove(sqlEnd.executionId());
801+
760802
sqlQueries.remove(sqlEnd.executionId());
761803
sqlPlans.remove(sqlEnd.executionId());
762804

@@ -765,6 +807,24 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
765807
metrics.setSpanMetrics(span);
766808
}
767809

810+
if (datasets != null) {
811+
log.info(
812+
"adding {} datasets to span for query execution id {}",
813+
datasets.size(),
814+
sqlEnd.executionId());
815+
816+
// iterate over the datasets with index
817+
for (int i = 0; i < datasets.size(); i++) {
818+
SparkSQLUtils.LineageDataset dataset = datasets.get(i);
819+
820+
span.setTag("dataset." + i + ".name", dataset.name);
821+
span.setTag("dataset." + i + ".schema", dataset.schema);
822+
span.setTag("dataset." + i + ".stats", dataset.stats);
823+
span.setTag("dataset." + i + ".properties", dataset.properties);
824+
span.setTag("dataset." + i + ".type", dataset.type);
825+
}
826+
}
827+
768828
span.finish(sqlEnd.time() * 1000);
769829
}
770830
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,20 @@
1414
import java.util.Map;
1515
import java.util.Set;
1616
import org.apache.spark.scheduler.AccumulableInfo;
17+
import org.apache.spark.sql.catalyst.plans.logical.AppendData;
18+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
1719
import org.apache.spark.sql.execution.SparkPlanInfo;
20+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
1821
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import scala.PartialFunction;
1925
import scala.Tuple2;
2026
import scala.collection.JavaConverters;
2127

2228
public class SparkSQLUtils {
29+
private static final Logger log = LoggerFactory.getLogger(SparkSQLUtils.class);
30+
2331
public static void addSQLPlanToStageSpan(
2432
AgentSpan span,
2533
SparkPlanInfo plan,
@@ -206,4 +214,81 @@ private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> acc
206214
generator.writeEndObject();
207215
}
208216
}
217+
218+
static class LineageDataset {
219+
final String name;
220+
final String schema;
221+
final String properties;
222+
final String stats;
223+
final String type;
224+
225+
public LineageDataset(
226+
String name, String schema, String stats, String properties, String type) {
227+
this.name = name;
228+
this.schema = schema;
229+
this.properties = properties;
230+
this.stats = stats;
231+
this.type = type;
232+
}
233+
234+
public LineageDataset(String name, String schema, String stats, String properties) {
235+
this.name = name;
236+
this.schema = schema;
237+
this.properties = properties;
238+
this.stats = stats;
239+
this.type = "unknown";
240+
}
241+
}
242+
243+
static PartialFunction<LogicalPlan, DataSourceV2Relation> pf =
244+
new PartialFunction<LogicalPlan, DataSourceV2Relation>() {
245+
@Override
246+
public boolean isDefinedAt(LogicalPlan x) {
247+
return x instanceof DataSourceV2Relation;
248+
}
249+
250+
@Override
251+
public DataSourceV2Relation apply(LogicalPlan x) {
252+
return (DataSourceV2Relation) x;
253+
}
254+
};
255+
256+
static PartialFunction<LogicalPlan, LineageDataset> logicalPlanToDataset =
257+
new PartialFunction<LogicalPlan, LineageDataset>() {
258+
@Override
259+
public boolean isDefinedAt(LogicalPlan x) {
260+
return x instanceof DataSourceV2Relation
261+
|| (x instanceof AppendData
262+
&& ((AppendData) x).table() instanceof DataSourceV2Relation);
263+
}
264+
265+
@Override
266+
public LineageDataset apply(LogicalPlan x) {
267+
try {
268+
if (x instanceof DataSourceV2Relation) {
269+
DataSourceV2Relation relation = (DataSourceV2Relation) x;
270+
return new LineageDataset(
271+
relation.table().name(),
272+
relation.schema().json(),
273+
"",
274+
relation.table().properties().toString(),
275+
"input");
276+
} else if (x instanceof AppendData) {
277+
AppendData appendData = (AppendData) x;
278+
DataSourceV2Relation relation = (DataSourceV2Relation) appendData.table();
279+
return new LineageDataset(
280+
relation.table().name(),
281+
relation.schema().json(),
282+
"",
283+
relation.table().properties().toString(),
284+
"output");
285+
}
286+
} catch (Exception e) {
287+
log.debug("Error while converting logical plan to dataset", e);
288+
return null;
289+
}
290+
291+
return null;
292+
}
293+
};
209294
}

0 commit comments

Comments
 (0)