diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index 9367f31e1..7267dc19f 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -79,6 +79,14 @@ trait Analyzer[S <: State[_], +M <: Metric[_]] extends Serializable { */ def computeMetricFrom(state: Option[S]): M + /** + * Returns the columns this analyzer reads from the data, if known statically. + * Returns Some(columns) when all referenced columns can be determined, + * or None when the analyzer may reference arbitrary columns (e.g. free-form SQL predicates). + * Used by AnalysisRunner to enable column pruning for V2 DataSource connectors like Iceberg. + */ + def columnsReferenced(): Option[Set[String]] = None + /** * A set of assertions that must hold on the schema of the data frame * @return diff --git a/src/main/scala/com/amazon/deequ/analyzers/ApproxCountDistinct.scala b/src/main/scala/com/amazon/deequ/analyzers/ApproxCountDistinct.scala index c33c8640d..a644172ad 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/ApproxCountDistinct.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/ApproxCountDistinct.scala @@ -64,4 +64,7 @@ case class ApproxCountDistinct(column: String, where: Option[String] = None) } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/ApproxQuantile.scala b/src/main/scala/com/amazon/deequ/analyzers/ApproxQuantile.scala index 8b779f391..7a564dde1 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/ApproxQuantile.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/ApproxQuantile.scala @@ -109,4 +109,7 @@ case class ApproxQuantile( } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/ApproxQuantiles.scala b/src/main/scala/com/amazon/deequ/analyzers/ApproxQuantiles.scala index 6d6c28e8c..84e694bb9 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/ApproxQuantiles.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/ApproxQuantiles.scala @@ -98,4 +98,6 @@ case class ApproxQuantiles(column: String, quantiles: Seq[Double], relativeError override def preconditions: Seq[StructType => Unit] = { PARAM_CHECKS :: hasColumn(column) :: isNumeric(column) :: Nil } + + override def columnsReferenced(): Option[Set[String]] = Some(Set(column)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/ColumnCount.scala b/src/main/scala/com/amazon/deequ/analyzers/ColumnCount.scala index 9eff89b6d..95da93f7d 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/ColumnCount.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/ColumnCount.scala @@ -60,4 +60,6 @@ case class ColumnCount() extends Analyzer[NumMatches, DoubleMetric] { override private[deequ] def toFailureMetric(failure: Exception): DoubleMetric = { Analyzers.metricFromFailure(failure, name, instance, entity) } + + override def columnsReferenced(): Option[Set[String]] = Some(Set.empty) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/ColumnExists.scala b/src/main/scala/com/amazon/deequ/analyzers/ColumnExists.scala index 1e989e10e..46619015b 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/ColumnExists.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/ColumnExists.scala @@ -64,4 +64,6 @@ case class ColumnExists(column: String) extends Analyzer[ColumnExistsState, Doub override private[deequ] def toFailureMetric(failure: Exception): DoubleMetric = { Analyzers.metricFromFailure(failure, name, instance, entity) } + + override def columnsReferenced(): Option[Set[String]] = Some(Set.empty) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala b/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala index 3a262d7cc..cb4379f65 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala @@ -50,6 +50,9 @@ case class Completeness(column: String, where: Option[String] = None, override def filterCondition: Option[String] = where + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) + @VisibleForTesting // required by some tests that compare analyzer results to an expected state private[deequ] def criterion: Column = conditionalSelection(column, where).isNotNull diff --git a/src/main/scala/com/amazon/deequ/analyzers/Compliance.scala b/src/main/scala/com/amazon/deequ/analyzers/Compliance.scala index f5e3b8a1b..6094b91a4 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Compliance.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Compliance.scala @@ -75,4 +75,8 @@ case class Compliance(instance: String, override protected def additionalPreconditions(): Seq[StructType => Unit] = columns.map(hasColumn) + + // Compliance uses free-form SQL predicates that can reference arbitrary columns, + // so we cannot safely determine which columns are needed. + override def columnsReferenced(): Option[Set[String]] = None } diff --git a/src/main/scala/com/amazon/deequ/analyzers/Correlation.scala b/src/main/scala/com/amazon/deequ/analyzers/Correlation.scala index c323456a8..4fcee8159 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Correlation.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Correlation.scala @@ -105,4 +105,7 @@ case class Correlation( } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(firstColumn, secondColumn)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataType.scala b/src/main/scala/com/amazon/deequ/analyzers/DataType.scala index fb3c1ca06..829bab8ab 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/DataType.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/DataType.scala @@ -183,4 +183,7 @@ case class DataType( } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala b/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala index b3ef9878e..3dfe64fbe 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala @@ -60,6 +60,9 @@ with FilterableAnalyzer { override def filterCondition: Option[String] = where + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) + @VisibleForTesting private def criterion: Column = conditionalSelection(column, where).cast(DoubleType) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala b/src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala index 502d59874..3f49456dc 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/KLLSketch.scala @@ -167,6 +167,8 @@ case class KLLSketch( override def preconditions(): Seq[StructType => Unit] = { PARAM_CHECK :: hasColumn(column) :: isNumeric(column) :: Nil } + + override def columnsReferenced(): Option[Set[String]] = Some(Set(column)) } object KLLSketch { diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala index 141d92fb5..9791705a5 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaxLength.scala @@ -54,6 +54,9 @@ case class MaxLength(column: String, where: Option[String] = None, analyzerOptio override def filterCondition: Option[String] = where + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) + private[deequ] def criterion: Column = { val isNullCheck = col(column).isNull val colLength = length(col(column)).cast(DoubleType) diff --git a/src/main/scala/com/amazon/deequ/analyzers/Maximum.scala b/src/main/scala/com/amazon/deequ/analyzers/Maximum.scala index 1e52a7ae4..9aebc6f17 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Maximum.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Maximum.scala @@ -64,6 +64,9 @@ case class Maximum(column: String, where: Option[String] = None, analyzerOptions override def filterCondition: Option[String] = where + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) + @VisibleForTesting private def criterion: Column = conditionalSelectionWithAugmentedOutcome(col(column), where) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/Mean.scala b/src/main/scala/com/amazon/deequ/analyzers/Mean.scala index d9afbd227..ac9bec9d1 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Mean.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Mean.scala @@ -54,4 +54,7 @@ case class Mean(column: String, where: Option[String] = None) } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala index ddc4497b2..df870bcd9 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MinLength.scala @@ -54,6 +54,9 @@ case class MinLength(column: String, where: Option[String] = None, analyzerOptio override def filterCondition: Option[String] = where + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) + private[deequ] def criterion: Column = { val isNullCheck = col(column).isNull val colLength = length(col(column)).cast(DoubleType) diff --git a/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala b/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala index 701ae0f05..d23d702c2 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Minimum.scala @@ -64,6 +64,9 @@ case class Minimum(column: String, where: Option[String] = None, analyzerOptions override def filterCondition: Option[String] = where + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) + @VisibleForTesting private def criterion: Column = conditionalSelectionWithAugmentedOutcome(col(column), where) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala b/src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala index eb62f9675..9ae003ac4 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala @@ -58,6 +58,9 @@ case class PatternMatch(column: String, pattern: Regex, where: Option[String] = override def filterCondition: Option[String] = where + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) + override protected def additionalPreconditions(): Seq[StructType => Unit] = { hasColumn(column) :: isString(column) :: Nil } diff --git a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala index 593d358d4..774e39ef9 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala @@ -89,4 +89,7 @@ case class RatioOfSums( } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(numerator, denominator)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/Size.scala b/src/main/scala/com/amazon/deequ/analyzers/Size.scala index c56083abe..2672da297 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Size.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Size.scala @@ -48,4 +48,7 @@ case class Size(where: Option[String] = None) } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set.empty) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/StandardDeviation.scala b/src/main/scala/com/amazon/deequ/analyzers/StandardDeviation.scala index dcf347af2..1ad2a1ab7 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/StandardDeviation.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/StandardDeviation.scala @@ -73,4 +73,7 @@ case class StandardDeviation(column: String, where: Option[String] = None) } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/Sum.scala b/src/main/scala/com/amazon/deequ/analyzers/Sum.scala index 535c14209..047c13fcf 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Sum.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Sum.scala @@ -52,4 +52,7 @@ case class Sum(column: String, where: Option[String] = None) } override def filterCondition: Option[String] = where + + override def columnsReferenced(): Option[Set[String]] = + if (where.isDefined) None else Some(Set(column)) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala b/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala index f86b608d8..abc20a114 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala @@ -21,6 +21,7 @@ import com.amazon.deequ.io.DfsUtils import com.amazon.deequ.metrics.{DoubleMetric, Metric} import com.amazon.deequ.repository.{MetricsRepository, ResultKey} import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel @@ -360,7 +361,9 @@ object AnalysisRunner { val offsets = shareableAnalyzers.scanLeft(0) { case (current, analyzer) => current + analyzer.aggregationFunctions().length } - val results = data.agg(aggregations.head, aggregations.tail: _*).collect().head + + val prunedData = pruneColumns(data, shareableAnalyzers) + val results = prunedData.agg(aggregations.head, aggregations.tail: _*).collect().head shareableAnalyzers.zip(offsets).map { case (analyzer, offset) => analyzer -> successOrFailureMetricFrom(analyzer, results, offset, aggregateWith, saveStatesTo) @@ -382,6 +385,35 @@ object AnalysisRunner { sharedResults ++ AnalyzerContext(otherMetrics) } + /** + * Attempts to select only the columns needed by the given analyzers. + * This enables column pruning for V2 DataSource connectors (e.g. Iceberg, Delta Lake) + * which make scan-planning decisions before Spark's optimizer can simplify the plan. + * + * Falls back to the original DataFrame if any analyzer cannot statically declare its columns + * (e.g. analyzers with free-form SQL predicates or WHERE clauses). + */ + private[this] def pruneColumns( + data: DataFrame, + analyzers: Seq[Analyzer[_, _]]) + : DataFrame = { + + val allColumns = analyzers.map(_.columnsReferenced()) + + if (allColumns.exists(_.isEmpty)) { + // At least one analyzer cannot declare its columns; skip pruning + data + } else { + val neededColumns = allColumns.flatMap(_.get).distinct + if (neededColumns.isEmpty) { + // All analyzers are dataset-level (e.g. Size), no column selection needed + data + } else { + data.select(neededColumns.map(col): _*) + } + } + } + /** Compute scan-shareable analyzer metric from aggregation result, mapping generic exceptions * to a failure metric */ private def successOrFailureMetricFrom( diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala index be5bdc5a6..9f930dbd0 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala @@ -864,6 +864,57 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with assert(testVal.value.isSuccess) assert(testVal.value.toOption.get.isInfinite) } + + "return correct columnsReferenced for analyzers without where clauses" in { + assert(Completeness("col1").columnsReferenced() === Some(Set("col1"))) + assert(Mean("col1").columnsReferenced() === Some(Set("col1"))) + assert(Maximum("col1").columnsReferenced() === Some(Set("col1"))) + assert(Minimum("col1").columnsReferenced() === Some(Set("col1"))) + assert(Sum("col1").columnsReferenced() === Some(Set("col1"))) + assert(StandardDeviation("col1").columnsReferenced() === Some(Set("col1"))) + assert(ApproxCountDistinct("col1").columnsReferenced() === Some(Set("col1"))) + assert(DataType("col1").columnsReferenced() === Some(Set("col1"))) + assert(PatternMatch("col1", ".*".r).columnsReferenced() === Some(Set("col1"))) + assert(MaxLength("col1").columnsReferenced() === Some(Set("col1"))) + assert(MinLength("col1").columnsReferenced() === Some(Set("col1"))) + assert(ExactQuantile("col1", 0.5).columnsReferenced() === Some(Set("col1"))) + assert(ApproxQuantile("col1", 0.5).columnsReferenced() === Some(Set("col1"))) + assert(ApproxQuantiles("col1", Seq(0.25, 0.75)).columnsReferenced() === Some(Set("col1"))) + assert(Correlation("col1", "col2").columnsReferenced() === Some(Set("col1", "col2"))) + assert(RatioOfSums("col1", "col2").columnsReferenced() === Some(Set("col1", "col2"))) + assert(Size().columnsReferenced() === Some(Set.empty)) + assert(ColumnCount().columnsReferenced() === Some(Set.empty)) + assert(ColumnExists("col1").columnsReferenced() === Some(Set.empty)) + assert(KLLSketch("col1").columnsReferenced() === Some(Set("col1"))) + } + + "return None for columnsReferenced when where clause is present" in { + assert(Completeness("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(Mean("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(Maximum("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(Minimum("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(Sum("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(StandardDeviation("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(ApproxCountDistinct("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(DataType("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(PatternMatch("col1", ".*".r, Some("col2 > 0")).columnsReferenced() === None) + assert(MaxLength("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(MinLength("col1", Some("col2 > 0")).columnsReferenced() === None) + assert(ExactQuantile("col1", 0.5, Some("col2 > 0")).columnsReferenced() === None) + assert(ApproxQuantile("col1", 0.5, where = Some("col2 > 0")).columnsReferenced() === None) + assert(Correlation("col1", "col2", Some("col1 > 0")).columnsReferenced() === None) + assert(RatioOfSums("col1", "col2", Some("col1 > 0")).columnsReferenced() === None) + assert(Size(Some("col1 > 0")).columnsReferenced() === None) + } + + "return None for columnsReferenced for Compliance (free-form SQL)" in { + assert(Compliance("test", "col1 > 0").columnsReferenced() === None) + assert(Compliance("test", "col1 > 0", columns = List("col1")).columnsReferenced() === None) + } + + "return None for columnsReferenced for CustomSql" in { + assert(CustomSql("SELECT COUNT(*) FROM table").columnsReferenced() === None) + } } } diff --git a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala index 5389d99e3..69bda4e94 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala @@ -407,5 +407,130 @@ class AnalysisRunnerTests extends AnyWordSpec assert(!computed.metricMap.keys.exists(_.isInstanceOf[Size])) } + + "produce correct results with column pruning for analyzers without where clauses" in + withSparkSession { session => + val data = getDfWithNumericValues(session) + + // These analyzers all declare their columns, so pruning will be active + val completeness = Completeness("att1") + val mean = Mean("att1") + val maximum = Maximum("att2") + val size = Size() + + val analysis = Analysis() + .addAnalyzer(completeness) + .addAnalyzer(mean) + .addAnalyzer(maximum) + .addAnalyzer(size) + + val result = AnalysisRunner.run(data, analysis) + + // Compute expected values directly on individual analyzers (no pruning path) + val expectedCompleteness = completeness.calculate(data) + val expectedMean = mean.calculate(data) + val expectedMaximum = maximum.calculate(data) + val expectedSize = size.calculate(data) + + // Verify pruned results match direct computation exactly + assert(result.metric(completeness).get.value === expectedCompleteness.value) + assert(result.metric(mean).get.value === expectedMean.value) + assert(result.metric(maximum).get.value === expectedMaximum.value) + assert(result.metric(size).get.value === expectedSize.value) + } + + "produce correct results when analyzers have where clauses (pruning disabled)" in + withSparkSession { session => + val data = getDfWithNumericValues(session) + + val completenessWithWhere = Completeness("att1", Some("att2 > 0")) + val mean = Mean("att1") + + val analysis = Analysis() + .addAnalyzer(completenessWithWhere) + .addAnalyzer(mean) + + val result = AnalysisRunner.run(data, analysis) + + // Verify values match direct computation + val expectedCompleteness = completenessWithWhere.calculate(data) + val expectedMean = mean.calculate(data) + + assert(result.metric(completenessWithWhere).get.value === expectedCompleteness.value) + assert(result.metric(mean).get.value === expectedMean.value) + } + + "produce correct results when mixing Compliance (unknown columns) with other analyzers" in + withSparkSession { session => + val data = getDfWithNumericValues(session) + + val compliance = Compliance("att1 > 0", "att1 > 0", columns = List("att1")) + val completeness = Completeness("att2") + + val analysis = Analysis() + .addAnalyzer(compliance) + .addAnalyzer(completeness) + + val result = AnalysisRunner.run(data, analysis) + + val expectedCompliance = compliance.calculate(data) + val expectedCompleteness = completeness.calculate(data) + + assert(result.metric(compliance).get.value === expectedCompliance.value) + assert(result.metric(completeness).get.value === expectedCompleteness.value) + } + + "produce correct results with only dataset-level analyzers (no column pruning needed)" in + withSparkSession { session => + val data = getDfWithNumericValues(session) + + val size = Size() + val analysis = Analysis().addAnalyzer(size) + + val result = AnalysisRunner.run(data, analysis) + + val expectedSize = size.calculate(data) + assert(result.metric(size).get.value === expectedSize.value) + } + + "produce correct results with multi-column analyzers" in + withSparkSession { session => + val data = getDfWithNumericValues(session) + + val correlation = Correlation("att1", "att2") + val completeness = Completeness("att1") + + val analysis = Analysis() + .addAnalyzer(correlation) + .addAnalyzer(completeness) + + val result = AnalysisRunner.run(data, analysis) + + val expectedCorrelation = correlation.calculate(data) + val expectedCompleteness = completeness.calculate(data) + + assert(result.metric(correlation).get.value === expectedCorrelation.value) + assert(result.metric(completeness).get.value === expectedCompleteness.value) + } + + "produce correct results when same column is referenced by multiple analyzers" in + withSparkSession { session => + val data = getDfWithNumericValues(session) + + val completeness = Completeness("att1") + val mean = Mean("att1") + val minimum = Minimum("att1") + + val analysis = Analysis() + .addAnalyzer(completeness) + .addAnalyzer(mean) + .addAnalyzer(minimum) + + val result = AnalysisRunner.run(data, analysis) + + assert(result.metric(completeness).get.value === completeness.calculate(data).value) + assert(result.metric(mean).get.value === mean.calculate(data).value) + assert(result.metric(minimum).get.value === minimum.calculate(data).value) + } } }