diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index fdae211077e..914b0619267 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -3,9 +3,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; import scala.collection.JavaConverters; @@ -53,6 +56,15 @@ protected List getPlanInfoMetrics(SparkPlanInfo info) { return JavaConverters.seqAsJavaList(info.metrics()); } + @Override + protected List parseDatasetsFromLogicalPlan( + LogicalPlan logicalPlan) { + return JavaConverters.seqAsJavaList(logicalPlan.collect(SparkSQLUtils.logicalPlanToDataset)) + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + @Override protected int[] getStageParentIds(StageInfo info) { int[] parentIds = new int[info.parentIds().length()]; diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 15e6fa5a80f..a7aecd2598d 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -23,6 +23,8 @@ public String[] helperClassNames() { packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", + packageName + ".SparkSQLUtils$LineageDataset", + packageName + ".SparkSQLUtils$1", }; } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 115cdcbb9b0..52dbbf9cb64 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -3,9 +3,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; import scala.jdk.javaapi.CollectionConverters; @@ -53,6 +56,15 @@ protected List getPlanInfoMetrics(SparkPlanInfo info) { return CollectionConverters.asJava(info.metrics()); } + @Override + protected List parseDatasetsFromLogicalPlan( + LogicalPlan logicalPlan) { + return CollectionConverters.asJava(logicalPlan.collect(SparkSQLUtils.logicalPlanToDataset)) + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + @Override protected int[] getStageParentIds(StageInfo info) { int[] parentIds = new int[info.parentIds().length()]; diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index 0d80eb7553c..4231b2b45d3 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -23,6 +23,8 @@ public String[] helperClassNames() { packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", + packageName + ".SparkSQLUtils$LineageDataset", + packageName + ".SparkSQLUtils$1", }; } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index bbd03d5b897..7142a76dea1 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -23,11 +23,13 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; +import java.lang.reflect.Field; import java.time.OffsetDateTime; import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -36,10 +38,13 @@ import java.util.Optional; import java.util.Properties; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.spark.ExceptionFailure; import org.apache.spark.SparkConf; import org.apache.spark.TaskFailedReason; import org.apache.spark.scheduler.*; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.QueryExecution; import org.apache.spark.sql.execution.SQLExecution; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; @@ -103,6 +108,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { protected final HashMap sqlPlans = new HashMap<>(); private final HashMap liveExecutors = new HashMap<>(); + private final List inputLineageDatasets = new ArrayList<>(); + private final List outputLineageDatasets = new ArrayList<>(); + // There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of // an active SQL query) // so capping the size of the collection storing them @@ -173,6 +181,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp /** Parent Ids of a Stage. Provide an implementation based on a specific scala version */ protected abstract int[] getStageParentIds(StageInfo info); + protected abstract List parseDatasetsFromLogicalPlan( + LogicalPlan logicalPlan); + @Override public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) { this.applicationStart = applicationStart; @@ -758,6 +769,7 @@ private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sql private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) { AgentSpan span = sqlSpans.remove(sqlEnd.executionId()); SparkAggregatedTaskMetrics metrics = sqlMetrics.remove(sqlEnd.executionId()); + sqlQueries.remove(sqlEnd.executionId()); sqlPlans.remove(sqlEnd.executionId()); @@ -768,6 +780,123 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) span.finish(sqlEnd.time() * 1000); } + + try { + addLineageDatasetsToSpan(sqlEnd); + } catch (Throwable ignored) { + } + } + + private void addLineageDatasetsToSpan(SparkListenerSQLExecutionEnd sqlEnd) { + if (!Config.get().isSparkDataLineageEnabled()) { + return; + } + + if (null == applicationSpan) { + return; + } + + if (inputLineageDatasets.size() + outputLineageDatasets.size() + >= Config.get().getSparkDataLineageLimit()) { + return; + } + + Optional logicalPlan = getAnalyzedLogicalPlan(sqlEnd); + if (!logicalPlan.isPresent()) { + return; + } + + List datasets = + adjustForLimit(parseDatasetsFromLogicalPlan(logicalPlan.get())); + if (datasets.isEmpty()) { + return; + } + + log.info( + "adding {} datasets to span for query execution id {}", + datasets.size(), + sqlEnd.executionId()); + + List inputDatasets = + datasets.stream() + .filter(dataset -> dataset.type.equals("input")) + .collect(Collectors.toList()); + List outputDatasets = + datasets.stream() + .filter(dataset -> dataset.type.equals("output")) + .collect(Collectors.toList()); + + updateLineageDatasetTag( + applicationSpan, inputDatasets, inputLineageDatasets, "spark.sql.dataset.input"); + updateLineageDatasetTag( + applicationSpan, outputDatasets, outputLineageDatasets, "spark.sql.dataset.output"); + } + + private void updateLineageDatasetTag( + AgentSpan span, + List datasets, + List lineageDatasets, + String tagPrefix) { + if (datasets.isEmpty()) { + return; + } + + final int lastLineageDatasetIndex = lineageDatasets.size(); + for (int i = 0; i < datasets.size(); i++) { + SparkSQLUtils.LineageDataset dataset = datasets.get(i); + int datasetIndex = lastLineageDatasetIndex + i; + + span.setTag(tagPrefix + ".details." + datasetIndex + ".name", dataset.name); + span.setTag(tagPrefix + ".details." + datasetIndex + ".schema", dataset.schema); + span.setTag(tagPrefix + ".details." + datasetIndex + ".stats", dataset.stats); + span.setTag(tagPrefix + ".details." + datasetIndex + ".properties", dataset.properties); + span.setTag(tagPrefix + ".details." + datasetIndex + ".type", dataset.type); + } + + lineageDatasets.addAll(datasets); + try { + applicationSpan.setTag(tagPrefix + ".json", objectMapper.writeValueAsString(lineageDatasets)); + applicationSpan.setTag(tagPrefix + ".count", lineageDatasets.size()); + } catch (Exception ignored) { + } + } + + private List adjustForLimit( + List datasets) { + final int currentLineageDatasetSize = + inputLineageDatasets.size() + outputLineageDatasets.size(); + if (currentLineageDatasetSize >= Config.get().getSparkDataLineageLimit()) { + return Collections.emptyList(); + } + + final int datasetAboveLimit = + currentLineageDatasetSize + datasets.size() - Config.get().getSparkDataLineageLimit(); + if (datasetAboveLimit > 0) { + datasets = datasets.subList(0, datasets.size() - datasetAboveLimit); + } + + return datasets; + } + + private Optional getAnalyzedLogicalPlan(SparkListenerSQLExecutionEnd sqlEnd) { + long sqlExecutionId = sqlEnd.executionId(); + QueryExecution queryExecution = SQLExecution.getQueryExecution(sqlExecutionId); + + if (queryExecution != null) { + return Optional.ofNullable(queryExecution.analyzed()); + } + + try { + Field qeField = sqlEnd.getClass().getDeclaredField("qe"); + qeField.setAccessible(true); + QueryExecution qe = (QueryExecution) qeField.get(sqlEnd); + if (qe != null) { + return Optional.ofNullable(qe.analyzed()); + } + } catch (Exception ignored) { + } + + return Optional.empty(); } private synchronized void onStreamingQueryStartedEvent( diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java index f3f1536c42a..398177fc009 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java @@ -1,25 +1,145 @@ package datadog.trace.instrumentation.spark; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import de.thetaphi.forbiddenapis.SuppressForbidden; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.spark.scheduler.AccumulableInfo; +import org.apache.spark.sql.catalyst.analysis.NamedRelation; +import org.apache.spark.sql.catalyst.catalog.CatalogStatistics; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation; +import org.apache.spark.sql.catalyst.plans.logical.AppendData; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.Statistics; import org.apache.spark.sql.execution.SparkPlanInfo; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.PartialFunction; import scala.Tuple2; import scala.collection.JavaConverters; public class SparkSQLUtils { + private static final Logger log = LoggerFactory.getLogger(SparkSQLUtils.class); + + private static final Class dataSourceV2RelationClass; + private static final MethodHandle tableMethod; + private static final MethodHandle computeStatsMethod; + + private static final Class replaceDataClass; + private static final MethodHandle tableMethodForReplaceData; + + private static final Class insertIntoHiveTableClass; + + private static final Class tableClass; + private static final MethodHandle schemaMethod; + private static final MethodHandle nameMethod; + private static final MethodHandle propertiesMethod; + + @SuppressForbidden // Using reflection to avoid splitting the instrumentation once more + private static Class findDataSourceV2Relation() throws ClassNotFoundException { + return Class.forName("org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation"); + } + + @SuppressForbidden // Using reflection to avoid splitting the instrumentation once more + private static Class findTable() throws ClassNotFoundException { + return Class.forName("org.apache.spark.sql.connector.catalog.Table"); + } + + @SuppressForbidden // Using reflection to avoid splitting the instrumentation once more + private static Class findReplaceData() throws ClassNotFoundException { + return Class.forName("org.apache.spark.sql.catalyst.plans.logical.ReplaceData"); + } + + @SuppressForbidden // Using reflection to avoid splitting the instrumentation once more + private static Class findInsertIntoHiveTable() throws ClassNotFoundException { + return Class.forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable"); + } + + static { + Class relationClassFound = null; + MethodHandle tableMethodFound = null; + MethodHandle computeStatsMethodFound = null; + + Class replaceDataClassFound = null; + MethodHandle tableMethodForReplaceDataFound = null; + + Class insertIntoHiveTableClassFound = null; + + Class tableClassFound = null; + MethodHandle nameMethodFound = null; + MethodHandle schemaMethodFound = null; + MethodHandle propertiesMethodFound = null; + + try { + MethodHandles.Lookup lookup = MethodHandles.lookup(); + + // TODO: this class contains different fields and methods in different versions of Spark (e.x + // 2.4 vs 3.5) + relationClassFound = findDataSourceV2Relation(); + tableClassFound = findTable(); + replaceDataClassFound = findReplaceData(); + insertIntoHiveTableClassFound = findInsertIntoHiveTable(); + + if (relationClassFound != null && tableClassFound != null) { + tableMethodFound = + lookup.findVirtual(relationClassFound, "table", MethodType.methodType(tableClassFound)); + computeStatsMethodFound = + lookup.findVirtual( + relationClassFound, "computeStats", MethodType.methodType(Statistics.class)); + } + + if (tableClassFound != null) { + schemaMethodFound = + lookup.findVirtual(tableClassFound, "schema", MethodType.methodType(StructType.class)); + nameMethodFound = + lookup.findVirtual(tableClassFound, "name", MethodType.methodType(String.class)); + propertiesMethodFound = + lookup.findVirtual(tableClassFound, "properties", MethodType.methodType(Map.class)); + } + + if (replaceDataClassFound != null) { + tableMethodForReplaceDataFound = + lookup.findVirtual( + replaceDataClassFound, "table", MethodType.methodType(NamedRelation.class)); + } + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException ignored) { + } + + dataSourceV2RelationClass = relationClassFound; + tableMethod = tableMethodFound; + computeStatsMethod = computeStatsMethodFound; + + replaceDataClass = replaceDataClassFound; + tableMethodForReplaceData = tableMethodForReplaceDataFound; + + insertIntoHiveTableClass = insertIntoHiveTableClassFound; + + tableClass = tableClassFound; + schemaMethod = schemaMethodFound; + nameMethod = nameMethodFound; + propertiesMethod = propertiesMethodFound; + } + public static void addSQLPlanToStageSpan( AgentSpan span, SparkPlanInfo plan, @@ -206,4 +326,217 @@ private void toJson(JsonGenerator generator, Map acc generator.writeEndObject(); } } + + static class LineageDataset { + @JsonProperty final String name; + + @JsonProperty final String schema; + + @JsonProperty final String properties; + + @JsonProperty final String stats; + + @JsonProperty final String type; + + public LineageDataset( + String name, String schema, String stats, String properties, String type) { + this.name = name; + this.schema = schema; + this.properties = properties; + this.stats = stats; + this.type = type; + } + } + + static PartialFunction logicalPlanToDataset = + new PartialFunction() { + private final ObjectMapper mapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Override + public boolean isDefinedAt(LogicalPlan x) { + return x instanceof DataSourceV2Relation + || (x instanceof AppendData + && ((AppendData) x).table() instanceof DataSourceV2Relation) + || (replaceDataClass != null && replaceDataClass.isInstance(x)) + || (insertIntoHiveTableClass != null && insertIntoHiveTableClass.isInstance(x)) + || x instanceof HiveTableRelation; + } + + @Override + public LineageDataset apply(LogicalPlan x) { + try { + if (dataSourceV2RelationClass != null && dataSourceV2RelationClass.isInstance(x)) { + return parseDataSourceV2Relation(x, "input"); + } else if (x instanceof AppendData) { + AppendData appendData = (AppendData) x; + NamedRelation table = appendData.table(); + if (dataSourceV2RelationClass != null + && dataSourceV2RelationClass.isInstance(table)) { + return parseDataSourceV2Relation(table, "output"); + } + } else if (replaceDataClass != null + && replaceDataClass.isInstance(x) + && tableMethodForReplaceData != null) { + Object table = tableMethodForReplaceData.invoke(x); + if (table != null + && dataSourceV2RelationClass != null + && dataSourceV2RelationClass.isInstance(table)) { + return parseDataSourceV2Relation(table, "output"); + } + } else if (insertIntoHiveTableClass != null && insertIntoHiveTableClass.isInstance(x)) { + return parseCatalogTable( + (CatalogTable) x.getClass().getMethod("table").invoke(x), "output"); + } else if (x instanceof HiveTableRelation) { + return parseCatalogTable(((HiveTableRelation) x).tableMeta(), "input"); + } + } catch (Throwable ignored) { + log.debug("Error while converting logical plan to dataset", ignored); + } + return null; + } + + private LineageDataset parseCatalogTable(CatalogTable table, String datasetType) { + Map properties = new HashMap<>(); + + if (table.provider().isDefined()) { + properties.put("provider", table.provider().get()); + } + + if (table.storage().locationUri().isDefined()) { + properties.put("location", table.storage().locationUri().get().toString()); + } + properties.put("storage", table.storage().toString()); + properties.put("created_time", Long.toString(table.createTime())); + properties.put("last_access_time", Long.toString(table.lastAccessTime())); + properties.put("owner", table.owner()); + properties.put("comment", table.comment().getOrElse(() -> "")); + + String propertiesJson = null; + try { + propertiesJson = mapper.writeValueAsString(properties); + } catch (JsonProcessingException ignored) { + log.debug("Error while converting properties to JSON", ignored); + } + + String statsJson = getCatalogTableStatsJson(table); + + return new LineageDataset( + table.qualifiedName(), table.schema().json(), statsJson, propertiesJson, datasetType); + } + + private LineageDataset parseDataSourceV2Relation(Object logicalPlan, String datasetType) { + if (null == dataSourceV2RelationClass + || !dataSourceV2RelationClass.isInstance(logicalPlan)) { + return null; + } + + if (null == tableMethod || null == tableClass) { + log.debug( + "table method or table class is not found, cannot parse current DataSourceV2Relation"); + return null; + } + + try { + String tableName = null; + String tableSchema = null; + String propertiesJson = null; + + Object table = tableMethod.invoke(logicalPlan); + if (null == table) { + return null; + } + + if (null == tableClass || !tableClass.isInstance(table)) { + log.debug("table is not instance of a Table class, cannot parse current LogicalPlan"); + return null; + } + + if (nameMethod != null) { + tableName = (String) nameMethod.invoke(table); + } + + if (schemaMethod != null) { + StructType schema = (StructType) schemaMethod.invoke(table); + tableSchema = schema.json(); + } + + if (propertiesMethod != null) { + Map propertyMap = + (Map) propertiesMethod.invoke(table); + propertiesJson = mapper.writeValueAsString(propertyMap); + log.debug("method properties found with content of {}", propertiesJson); + } + + String statsJson = getDataSourceV2RelationStatsJson(logicalPlan); + + return new LineageDataset( + tableName, tableSchema, statsJson, propertiesJson, datasetType); + } catch (Throwable ignored) { + log.debug("Error while converting logical plan to dataset", ignored); + return null; + } + } + + private String getCatalogTableStatsJson(CatalogTable table) { + if (!table.stats().isDefined()) { + return null; + } + + String statsJson = null; + + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + JsonGenerator generator = mapper.getFactory().createGenerator(outputStream)) { + CatalogStatistics stats = table.stats().get(); + + generator.writeStartObject(); + generator.writeNumberField("sizeInBytes", stats.sizeInBytes().longValue()); + if (stats.rowCount().isDefined()) { + generator.writeNumberField("rowCount", stats.rowCount().get().longValue()); + } + generator.writeObjectField("colStats", stats.colStats()); + generator.writeEndObject(); + + statsJson = new String(outputStream.toByteArray(), StandardCharsets.UTF_8); + + log.debug("CatalogTable stats: {}", stats); + } catch (Throwable ignored) { + } + + return statsJson; + } + + private String getDataSourceV2RelationStatsJson(Object logicalPlan) { + if (null == computeStatsMethod + || null == dataSourceV2RelationClass + || !dataSourceV2RelationClass.isInstance(logicalPlan)) { + return null; + } + + String statsJson = null; + + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + JsonGenerator generator = mapper.getFactory().createGenerator(outputStream)) { + Statistics stats = (Statistics) computeStatsMethod.invoke(logicalPlan); + + if (stats != null) { + log.debug("DataSourceV2Relation stats: {}", stats); + + generator.writeStartObject(); + generator.writeNumberField("sizeInBytes", stats.sizeInBytes().longValue()); + if (stats.rowCount().isDefined()) { + generator.writeNumberField("rowCount", stats.rowCount().get().longValue()); + } + generator.writeObjectField("attributeStats", stats.attributeStats()); + generator.writeEndObject(); + + statsJson = new String(outputStream.toByteArray(), StandardCharsets.UTF_8); + } + } catch (Throwable ignored) { + log.debug("Error while converting DataSourceV2Relation stats to JSON", ignored); + } + + return statsJson; + } + }; } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index fe360b81dd8..9b6e8bcc062 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -249,6 +249,8 @@ public final class ConfigDefaults { static final boolean DEFAULT_SPARK_TASK_HISTOGRAM_ENABLED = true; static final boolean DEFAULT_SPARK_APP_NAME_AS_SERVICE = false; + static final boolean DEFAULT_SPARK_DATA_LINEAGE_ENABLED = true; + static final int DEFAULT_SPARK_DATA_LINEAGE_LIMIT = 500; static final boolean DEFAULT_JAX_RS_EXCEPTION_AS_ERROR_ENABLED = true; static final boolean DEFAULT_TELEMETRY_DEBUG_REQUESTS_ENABLED = false; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index f15f2899d79..13fde15cd1b 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -157,6 +157,8 @@ public final class TraceInstrumentationConfig { public static final String SPARK_TASK_HISTOGRAM_ENABLED = "spark.task-histogram.enabled"; public static final String SPARK_APP_NAME_AS_SERVICE = "spark.app-name-as-service"; + public static final String SPARK_DATA_LINEAGE_ENABLED = "spark.data.lineage.enabled"; + public static final String SPARK_DATA_LINEAGE_LIMIT = "spark.data.lineage.limit"; public static final String JAX_RS_EXCEPTION_AS_ERROR_ENABLED = "trace.jax-rs.exception-as-error.enabled"; diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 02b78b68031..b22774e7b77 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -524,6 +524,8 @@ public static String getHostName() { private final boolean elasticsearchBodyAndParamsEnabled; private final boolean sparkTaskHistogramEnabled; private final boolean sparkAppNameAsService; + private final boolean sparkDataLineageEnabled; + private final int sparkDataLineageLimit; private final boolean jaxRsExceptionAsErrorsEnabled; private final boolean axisPromoteResourceName; @@ -1764,6 +1766,12 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) configProvider.getBoolean( SPARK_APP_NAME_AS_SERVICE, ConfigDefaults.DEFAULT_SPARK_APP_NAME_AS_SERVICE); + this.sparkDataLineageEnabled = + configProvider.getBoolean(SPARK_DATA_LINEAGE_ENABLED, DEFAULT_SPARK_DATA_LINEAGE_ENABLED); + + this.sparkDataLineageLimit = + configProvider.getInteger(SPARK_DATA_LINEAGE_LIMIT, DEFAULT_SPARK_DATA_LINEAGE_LIMIT); + this.jaxRsExceptionAsErrorsEnabled = configProvider.getBoolean( JAX_RS_EXCEPTION_AS_ERROR_ENABLED, @@ -3341,6 +3349,14 @@ public boolean useSparkAppNameAsService() { return sparkAppNameAsService; } + public boolean isSparkDataLineageEnabled() { + return sparkDataLineageEnabled; + } + + public int getSparkDataLineageLimit() { + return sparkDataLineageLimit; + } + public boolean isJaxRsExceptionAsErrorEnabled() { return jaxRsExceptionAsErrorsEnabled; } @@ -4559,6 +4575,10 @@ public String toString() { + sparkTaskHistogramEnabled + ", sparkAppNameAsService=" + sparkAppNameAsService + + ", sparkDataLineageEnabled=" + + sparkDataLineageEnabled + + ", sparkDataLineageLimit=" + + sparkDataLineageLimit + ", jaxRsExceptionAsErrorsEnabled=" + jaxRsExceptionAsErrorsEnabled + ", axisPromoteResourceName="