@@ -17,18 +17,20 @@ package org.apache.spark.sql.delta.stats
1717
1818// scalastyle:off import.ordering.noEmptyLine
1919import org .apache .spark .sql .{Column , DataFrame , SparkSession }
20+ import org .apache .spark .sql .catalyst .encoders .ExpressionEncoder
2021import org .apache .spark .sql .catalyst .expressions ._
2122import org .apache .spark .sql .catalyst .expressions .Literal .{FalseLiteral , TrueLiteral }
2223import org .apache .spark .sql .catalyst .util .TypeUtils
2324import org .apache .spark .sql .execution .InSubqueryExec
24- import org .apache .spark .sql .functions .{col , from_json , lit , substring , to_json }
25+ import org .apache .spark .sql .expressions .SparkUserDefinedFunction
26+ import org .apache .spark .sql .functions .{coalesce , col , from_json , lit , struct , substring , to_json }
2527import org .apache .spark .sql .catalyst .analysis .{Resolver , UnresolvedAttribute }
2628import org .apache .spark .sql .delta .ClassicColumnConversions ._
2729import org .apache .spark .sql .delta .DeltaColumnMapping
2830import org .apache .spark .sql .delta .DeltaTableUtils .isPredicatePartitionColumnsOnly
2931import org .apache .spark .sql .delta .sources .DeltaSQLConf
3032import org .apache .spark .sql .delta .stats .DeltaStatistics .{MIN , MAX , NULL_COUNT , NUM_RECORDS }
31- import org .apache .spark .sql .types .{DataType , LongType , StringType , StructField , StructType }
33+ import org .apache .spark .sql .types .{BooleanType , DataType , LongType , StringType , StructField , StructType }
3234import org .apache .spark .unsafe .types .UTF8String
3335// scalastyle:on import.ordering.noEmptyLine
3436
@@ -240,7 +242,7 @@ object DataFiltersBuilderUtils {
240242 Seq (stat,
241243 StatsColumn (NULL_COUNT , stat.pathToColumn, LongType ),
242244 StatsColumn (NUM_RECORDS , pathToColumn = Nil , LongType ))
243- case _ =>
245+ case _ =>
244246 Seq (stat)
245247 }
246248 }
@@ -477,6 +479,160 @@ object DataFiltersBuilderUtils {
477479 spark : SparkSession ): (Seq [Expression ], Seq [Expression ]) = {
478480 filterExprs.partition(isPredicatePartitionColumnsOnly(_, partitionColumns, spark))
479481 }
482+
483+ // ==================== Size Collector (shared V1/V2) ==============
484+
485+ /**
486+ * Input encoders for the size collector UDF.
487+ * Moved from DataSkippingReader companion object to be shared.
488+ */
489+ lazy val sizeCollectorInputEncoders
490+ : Seq [Option [ExpressionEncoder [_]]] = Seq (
491+ Option (ExpressionEncoder [Boolean ]()),
492+ Option (ExpressionEncoder [java.lang.Long ]()),
493+ Option (ExpressionEncoder [java.lang.Long ]()),
494+ Option (ExpressionEncoder [java.lang.Long ]()))
495+
496+ /**
497+ * Build a UDF-based filter that counts bytes/rows/files via an
498+ * [[ArrayAccumulator ]]. The UDF is a pass-through boolean filter
499+ * with the side-effect of accumulating size statistics.
500+ *
501+ * Moved from V1 DataSkippingReader.buildSizeCollectorFilter to be
502+ * shared between V1 and V2.
503+ *
504+ * Accumulator slots:
505+ * 0 - bytes compressed (from `size` column)
506+ * 1 - physical row count (from numRecords stat, -1 if unknown)
507+ * 2 - file count (1 per file)
508+ * 3 - logical row count (physical - DV cardinality, -1 if unknown)
509+ *
510+ * @param spark SparkSession (for accumulator registration)
511+ * @param numRecordsCol Column for stats.numRecords (V1 may use
512+ * column-mapping; V2 uses col("stats.numRecords"))
513+ * @return (accumulator, filterFn) where filterFn wraps a boolean
514+ * Column with the size-collecting UDF
515+ */
516+ def buildSizeCollectorFilter (
517+ spark : SparkSession ,
518+ numRecordsCol : Column
519+ ): (ArrayAccumulator , Column => Column ) = {
520+ val bytesCompressed = col(" size" )
521+ val dvCardinality =
522+ coalesce(col(" deletionVector.cardinality" ), lit(0L ))
523+ val logicalRows =
524+ (numRecordsCol - dvCardinality).as(" logicalRows" )
525+
526+ val accumulator = new ArrayAccumulator (4 )
527+ spark.sparkContext.register(accumulator)
528+
529+ val collector = (
530+ include : Boolean ,
531+ bytesCompressed : java.lang.Long ,
532+ logicalRows : java.lang.Long ,
533+ rows : java.lang.Long ) => {
534+ if (include) {
535+ accumulator.add((0 , bytesCompressed))
536+ accumulator.add(
537+ (1 , Option (rows).map(_.toLong).getOrElse(- 1L )))
538+ accumulator.add((2 , 1 ))
539+ accumulator.add(
540+ (3 , Option (logicalRows)
541+ .map(_.toLong).getOrElse(- 1L )))
542+ }
543+ include
544+ }
545+ val collectorUdf = SparkUserDefinedFunction (
546+ f = collector,
547+ dataType = BooleanType ,
548+ inputEncoders = sizeCollectorInputEncoders,
549+ deterministic = false )
550+
551+ (accumulator,
552+ collectorUdf(
553+ _ : Column , bytesCompressed, logicalRows, numRecordsCol))
554+ }
555+
556+ // ==================== Scan Pipeline ==============================
557+
558+ /**
559+ * Result of the shared scan pipeline.
560+ *
561+ * Accumulators are populated as a side-effect when
562+ * [[filteredDF ]] is materialized (collected / iterated).
563+ * Call [[totalSize ]] / [[partitionSize ]] / [[scanSize ]]
564+ * ONLY after the DataFrame has been fully consumed.
565+ */
566+ case class ScanPipelineResult (
567+ filteredDF : DataFrame ,
568+ totalAccumulator : ArrayAccumulator ,
569+ partitionAccumulator : ArrayAccumulator ,
570+ scanAccumulator : ArrayAccumulator ) {
571+ def totalSize : DataSize = DataSize (totalAccumulator)
572+ def partitionSize : DataSize = DataSize (partitionAccumulator)
573+ def scanSize : DataSize = DataSize (scanAccumulator)
574+ }
575+
576+ /**
577+ * Execute the shared scan pipeline: partition pruning + data
578+ * skipping + size collection in ONE DataFrame pass.
579+ *
580+ * Both V1 and V2 delegate here. Three accumulator-instrumented
581+ * UDF filters are applied together:
582+ * 1. totalFilter(TRUE) - counts ALL files
583+ * 2. partFilter(partitionFilters) - counts partition-matched
584+ * 3. scanFilter(safeDataFilter) - counts data-skipped
585+ *
586+ * @param withStatsDF DataFrame with parsed stats column
587+ * @param partitionFilters Partition filter as a Column expression
588+ * (already rewritten to partitionValues.*)
589+ * @param dataSkippingPred Data skipping predicate
590+ * @param getStatColumn Resolves StatsColumn to Column
591+ * @param numRecordsCol Column for stats.numRecords
592+ * @param spark SparkSession
593+ * @return ScanPipelineResult (DF + accumulators)
594+ */
595+ def executeScanPipeline (
596+ withStatsDF : DataFrame ,
597+ partitionFilters : Column ,
598+ dataSkippingPred : DataSkippingPredicate ,
599+ getStatColumn : StatsColumn => Option [Column ],
600+ numRecordsCol : Column ,
601+ spark : SparkSession ): ScanPipelineResult = {
602+ val trueLit : Column = Column (TrueLiteral )
603+
604+ val (totalAcc, totalFilter) =
605+ buildSizeCollectorFilter(spark, numRecordsCol)
606+ val (partAcc, partFilter) =
607+ buildSizeCollectorFilter(spark, numRecordsCol)
608+ val (scanAcc, scanFilter) =
609+ buildSizeCollectorFilter(spark, numRecordsCol)
610+
611+ val safeFilter =
612+ buildSafeSkippingFilter(dataSkippingPred, getStatColumn)
613+
614+ val filteredDF = withStatsDF.where(
615+ totalFilter(trueLit) &&
616+ partFilter(partitionFilters) &&
617+ scanFilter(safeFilter))
618+
619+ ScanPipelineResult (filteredDF, totalAcc, partAcc, scanAcc)
620+ }
621+
622+ /**
623+ * Build a partition filter Column from rewritten partition
624+ * expressions. Convenience for callers that have already split
625+ * and rewritten their partition filters.
626+ *
627+ * @return Column that is TRUE when no partition filters exist
628+ */
629+ def buildPartitionFilterColumn (
630+ partitionFilterExprs : Seq [Expression ]): Column = {
631+ partitionFilterExprs
632+ .reduceLeftOption(And )
633+ .map(e => Column (e))
634+ .getOrElse(Column (TrueLiteral ))
635+ }
480636}
481637
482638/**
0 commit comments