Skip to content

Commit 81f27b3

Browse files
authored
[Spark] Restrict partition-like data filters to whitelist of known-good expressions (delta-io#3872)
1 parent 4d2c5cf commit 81f27b3

File tree

2 files changed

+128
-15
lines changed

2 files changed

+128
-15
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala

Lines changed: 95 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
3939
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
4040
import org.apache.spark.sql.catalyst.expressions._
4141
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
42-
import org.apache.spark.sql.catalyst.expressions.objects.InvokeLike
4342
import org.apache.spark.sql.catalyst.util.TypeUtils
4443
import org.apache.spark.sql.execution.InSubqueryExec
4544
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
@@ -597,6 +596,93 @@ trait DataSkippingReaderBase
597596
maxExpr: Expression,
598597
nullCountExpr: Expression)
599598

599+
/**
600+
* Whitelist of expressions that can be rewritten as partition-like.
601+
* Set to a finite list to avoid having to silently introducing correctness issues as new
602+
* expressions that violate the assumptions of partition-like skipping are introduced.
603+
* There's no need to include [[SkippingEligibleColumn]] here - it's already handled explicitly.
604+
*
605+
* The following expressions have been intentionally excluded from the whitelist of supported
606+
* expressions:
607+
* - [[AttributeReference]]: Any non-skipping eligible column references can't be rewritten as
608+
* partition-like.
609+
* - Any nondeterministic expression: The value returned while skipping might be different when
610+
* the expression is evaluated again. For example, rand() > 0.5 would return ~25% of records
611+
* if used in data skipping, while the user would expect ~50% of records to be returned.
612+
* - [[UserDefinedExpression]]: Often nondeterministic, and may have side effects when executed
613+
* multiple times.
614+
* - [[RegExpReplace]], [[RegExpExtractBase]], [[Like]], [[MultiLikeBase]], [[InvokeLike]], and
615+
* [[JsonToStructs]]: These expressions might be very expensive to evalute more than once.
616+
*/
617+
private def shouldRewriteAsPartitionLike(expr: Expression): Boolean = expr match {
618+
// Expressions supported by traditional data skipping.
619+
// Boolean operators. AND is explicitly handled by the caller.
620+
case _: Not | _: Or => true
621+
// Comparison operators.
622+
case _: EqualNullSafe | _: EqualTo | _: GreaterThan | _: GreaterThanOrEqual | _: IsNull |
623+
_: IsNotNull | _: LessThan | _: LessThanOrEqual => true
624+
// String and set operators. InSubqueryExec is explicitly handled by the caller.
625+
case _: In | _: InSet | _: StartsWith => true
626+
case _: Literal => true
627+
628+
// Expressions only supported for partition-like data skipping.
629+
// Date and time conversions.
630+
case _: ConvertTimezone | _: DateFormatClass | _: Extract | _: GetDateField |
631+
_: GetTimeField | _: IntegralToTimestampBase | _: MakeDate | _: MakeTimestamp |
632+
_: ParseToDate | _: ParseToTimestamp | _: ToTimestamp | _: TruncDate |
633+
_: TruncTimestamp | _: UTCTimestamp => true
634+
// Unix date and timestamp conversions.
635+
case _: DateFromUnixDate | _: FromUnixTime | _: TimestampToLongBase | _: ToUnixTimestamp |
636+
_: UnixDate | _: UnixTime | _: UnixTimestamp => true
637+
// Date and time arithmetic.
638+
case _: AddMonthsBase | _: DateAdd | _: DateAddInterval | _: DateDiff | _: DateSub |
639+
_: DatetimeSub | _: LastDay | _: MonthsBetween | _: NextDay | _: SubtractDates |
640+
_: SubtractTimestamps | _: TimeAdd | _: TimestampAdd | _: TimestampAddYMInterval |
641+
_: TimestampDiff | _: TruncInstant => true
642+
// String expressions.
643+
case _: Base64 | _: BitLength | _: Chr | _: ConcatWs | _: Decode | _: Elt | _: Empty2Null |
644+
_: Encode | _: FormatNumber | _: FormatString | _: ILike | _: InitCap | _: Left |
645+
_: Length | _: Levenshtein | _: Luhncheck | _: OctetLength | _: Overlay | _: Right |
646+
_: Sentences | _: SoundEx | _: SplitPart | _: String2StringExpression |
647+
_: String2TrimExpression | _: StringDecode | _: StringInstr | _: StringLPad |
648+
_: StringLocate | _: StringPredicate | _: StringRPad | _: StringRepeat |
649+
_: StringReplace | _: StringSpace | _: StringSplit | _: StringSplitSQL |
650+
_: StringTranslate | _: StringTrimBoth | _: Substring | _: SubstringIndex | _: ToBinary |
651+
_: TryToBinary | _: UnBase64 => true
652+
// Arithmetic expressions.
653+
case _: Abs | _: BinaryArithmetic | _: Greatest | _: Least | _: UnaryMinus |
654+
_: UnaryPositive => true
655+
// Array expressions.
656+
case _: ArrayBinaryLike | _: ArrayCompact | _: ArrayContains | _: ArrayInsert | _: ArrayJoin |
657+
_: ArrayMax | _: ArrayMin | _: ArrayPosition | _: ArrayRemove | _: ArrayRepeat |
658+
_: ArraySetLike | _: ArraySize | _: ArraysZip |
659+
_: BinaryArrayExpressionWithImplicitCast | _: Concat | _: CreateArray | _: ElementAt |
660+
_: Flatten | _: Get | _: GetArrayItem | _: GetArrayStructFields |
661+
_: Reverse | _: Sequence | _: Size | _: Slice | _: SortArray | _: TryElementAt => true
662+
// Map expressions.
663+
case _: CreateMap | _: GetMapValue | _: MapConcat | _: MapContainsKey | _: MapEntries |
664+
_: MapFromArrays | _: MapFromEntries | _: MapKeys | _: MapValues | _: StringToMap => true
665+
// Struct expressions.
666+
case _: CreateNamedStruct | _: DropField | _: GetStructField | _: UpdateFields |
667+
_: WithField => true
668+
// Hash expressions.
669+
case _: Crc32 | _: HashExpression[_] | _: Md5 | _: Sha1 | _: Sha2 => true
670+
// URL expressions.
671+
case _: ParseUrl | _: UrlDecode | _: UrlEncode => true
672+
// NULL expressions.
673+
case _: AtLeastNNonNulls | _: Coalesce | _: IsNaN | _: NaNvl | _: NullIf | _: Nvl |
674+
_: Nvl2 => true
675+
// Cast expressions.
676+
case _: Cast | _: UpCast => true
677+
// Conditional expressions.
678+
case _: If | _: CaseWhen => true
679+
case _: Alias => true
680+
681+
// Don't attempt partition-like skipping on any unknown expressions: there's no way to
682+
// guarantee it's safe to do so.
683+
case _ => false
684+
}
685+
600686
/**
601687
* Rewrites the references in an expression to point to the collected stats over that column
602688
* (if possible).
@@ -650,10 +736,10 @@ trait DataSkippingReaderBase
650736
}
651737
// For other attribute references, we can't safely rewrite the expression.
652738
case SkippingEligibleColumn(_, _) => None
653-
// Don't attempt data skipping on a nondeterministic expression, since the value returned
654-
// might be different when executed twice on the same input.
655-
// For example, rand() > 0.5 would return ~25% of records if used in data skipping, while the
656-
// user would expect ~50% of records to be returned.
739+
// Explicitly disallow rewriting nondeterministic expressions. Even though this check isn't
740+
// strictly necessary (there shouldn't be any nondeterministic expressions in the whitelist),
741+
// defensively keep it due to the extreme risk of correctness issues if any nondeterministic
742+
// expressions sneak into the whitelist.
657743
case other if !other.deterministic => None
658744
// Inline subquery results to support InSet. The subquery should generally have already been
659745
// evaluated.
@@ -667,13 +753,6 @@ trait DataSkippingReaderBase
667753
Some(InSet(rewrittenChildren, possiblyNullValues.toSet), referencedStats)
668754
}
669755
}
670-
// Don't allow rewriting UDFs - even if deterministic, UDFs might have some unexpected
671-
// side effects when executed twice.
672-
case _: UserDefinedExpression => None
673-
// Don't attempt to rewrite expressions might be extremely expensive to invoke twice.
674-
case _: RegExpReplace | _: RegExpExtractBase | _: Like | _: MultiLikeBase => None
675-
case _: InvokeLike => None
676-
case _: JsonToStructs => None
677756
// Pushdown NOT through OR - we prefer AND to OR because AND can tolerate one branch not being
678757
// rewriteable.
679758
case Not(Or(e1, e2)) =>
@@ -688,14 +767,16 @@ trait DataSkippingReaderBase
688767
Some((And(newLeft, newRight), statsLeft ++ statsRight))
689768
case _ => leftResult.orElse(rightResult)
690769
}
691-
// For all other expressions, recursively rewrite the children.
692-
case other =>
770+
// For all other eligible expressions, recursively rewrite the children.
771+
case other if shouldRewriteAsPartitionLike(other) =>
693772
val childResults = other.children.map(
694773
rewriteDataFiltersAsPartitionLikeInternal(_, clusteringColumnPaths))
695774
Option.whenNot (childResults.exists(_.isEmpty)) {
696775
val (children, stats) = childResults.map(_.get).unzip
697776
(other.withNewChildren(children), stats.flatten.toSet)
698777
}
778+
// Don't attempt rewriting any non-whitelisted expressions.
779+
case _ => None
699780
}
700781

701782
/**

spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
2727
import org.apache.spark.SparkConf
2828
import org.apache.spark.sql.QueryTest
2929
import org.apache.spark.sql.catalyst.TableIdentifier
30-
import org.apache.spark.sql.functions.{col, concat, lit, struct}
30+
import org.apache.spark.sql.functions.{array, col, concat, lit, struct}
3131
import org.apache.spark.sql.test.SharedSparkSession
3232

3333
trait PartitionLikeDataSkippingSuiteBase
@@ -341,6 +341,38 @@ trait PartitionLikeDataSkippingSuiteBase
341341
allPredicatesUsed = true,
342342
minNumFilesToApply = 1)
343343
}
344+
345+
test("partition-like data skipping expression references non-skipping eligible columns") {
346+
val tbl = "tbl"
347+
withClusteredTable(
348+
table = tbl,
349+
schema = "a BIGINT, b ARRAY<BIGINT>, c STRUCT<d ARRAY<BIGINT>, e BIGINT>",
350+
clusterBy = "a") {
351+
spark.range(10)
352+
.withColumnRenamed("id", "a")
353+
.withColumn("b", array(col("a"), lit(0L)))
354+
.withColumn("c", struct(array(col("a"), lit(0L)), lit(0L)))
355+
.select("a", "b", "c") // Reorder columns to ensure the schema matches.
356+
.repartitionByRange(10, col("a"))
357+
.write.format("delta").mode("append").insertInto(tbl)
358+
359+
// All files should be read because the filters are on columns that aren't skipping eligible.
360+
validateExpectedScanMetrics(
361+
tableName = tbl,
362+
query = s"SELECT * FROM $tbl WHERE GET(b, 1) = 0",
363+
expectedNumFiles = 10,
364+
expectedNumPartitionLikeDataFilters = 0,
365+
allPredicatesUsed = false,
366+
minNumFilesToApply = 1)
367+
validateExpectedScanMetrics(
368+
tableName = tbl,
369+
query = s"SELECT * FROM $tbl WHERE GET(c.d, 1) = 0",
370+
expectedNumFiles = 10,
371+
expectedNumPartitionLikeDataFilters = 0,
372+
allPredicatesUsed = false,
373+
minNumFilesToApply = 1)
374+
}
375+
}
344376
}
345377

346378
class PartitionLikeDataSkippingSuite extends PartitionLikeDataSkippingSuiteBase

0 commit comments

Comments
 (0)