Skip to content

Commit 3f5adad

Browse files
olakyMaxGekk
authored andcommitted
[SPARK-50558][SQL] Introduce simpleString for ExpressionSet
### What changes were proposed in this pull request? * Introduce a simpleString method equal to the one for Expression and add it to ExpressionSet * Use it for push down filter logging in DataSourceStrategy * Use if for after scan filter logging in FileSourceStrategy ### Why are the changes needed? Filter expressions can be arbitrarily large and should not be logged completely in these cases ### Does this PR introduce _any_ user-facing change? No, logging is not user facing ### How was this patch tested? Added new tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #49650 from olaky/spark-50558-add-simple-string-for-expression-set. Authored-by: Ole Sasse <ole.sasse@databricks.com> Signed-off-by: Max Gekk <max.gekk@gmail.com> (cherry picked from commit 771d81a) Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent 2c32f93 commit 3f5adad

File tree

5 files changed

+44
-5
lines changed

5 files changed

+44
-5
lines changed

sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ object SparkStringUtils extends Logging {
8383
start: String,
8484
sep: String,
8585
end: String,
86-
maxFields: Int): String = {
86+
maxFields: Int,
87+
customToString: Option[T => String] = None): String = {
8788
if (seq.length > maxFields) {
8889
if (truncationWarningPrinted.compareAndSet(false, true)) {
8990
logWarning(
@@ -94,9 +95,17 @@ object SparkStringUtils extends Logging {
9495
val restNum = seq.length - numFields
9596
val ending = (if (numFields == 0) "" else sep) +
9697
(if (restNum == 0) "" else s"... $restNum more fields") + end
97-
seq.take(numFields).mkString(start, sep, ending)
98+
if (customToString.isDefined) {
99+
seq.take(numFields).map(customToString.get).mkString(start, sep, ending)
100+
} else {
101+
seq.take(numFields).mkString(start, sep, ending)
102+
}
98103
} else {
99-
seq.mkString(start, sep, end)
104+
if (customToString.isDefined) {
105+
seq.map(customToString.get).mkString(start, sep, end)
106+
} else {
107+
seq.mkString(start, sep, end)
108+
}
100109
}
101110
}
102111

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
2020
import scala.collection.mutable
2121
import scala.collection.mutable.ArrayBuffer
2222

23+
import org.apache.spark.sql.catalyst.util.SparkStringUtils
24+
2325
object ExpressionSet {
2426
/**
2527
* Constructs a new [[ExpressionSet]] by applying [[Expression#canonicalized]] to `expressions`.
@@ -178,5 +180,12 @@ class ExpressionSet protected(
178180
|baseSet: ${baseSet.mkString(", ")}
179181
|originals: ${originals.mkString(", ")}
180182
""".stripMargin
183+
184+
/** Returns a length limited string that must be used for logging only. */
185+
def simpleString(maxFields: Int): String = {
186+
val customToString = { e: Expression => e.simpleString(maxFields) }
187+
SparkStringUtils.truncatedString(
188+
seq = originals.toSeq, start = "Set(", sep = ", ", end = ")", maxFields, Some(customToString))
189+
}
181190
}
182191

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,4 +240,18 @@ class ExpressionSetSuite extends SparkFunSuite {
240240
assert((initialSet -- setToRemoveWithOutSameExpression).size == 2)
241241
}
242242

243+
test("simpleString limits the number of expressions recursively") {
244+
val expressionSet =
245+
ExpressionSet(InSet(aUpper, Set(0, 1)) :: Rand(1) :: Rand(2) :: Rand(3) :: Nil)
246+
assert(expressionSet.simpleString(1) ==
247+
"Set(A#1 INSET 0, ... 1 more fields, ... 3 more fields)")
248+
assert(expressionSet.simpleString(2) == "Set(A#1 INSET 0, 1, rand(1), ... 2 more fields)")
249+
assert(expressionSet.simpleString(3) ==
250+
"Set(A#1 INSET 0, 1, rand(1), rand(2), ... 1 more fields)")
251+
assert(expressionSet.simpleString(4) == expressionSet.toString)
252+
253+
// Only one expression, but the simple string for this expression must be truncated.
254+
val expressionSetTwo = ExpressionSet(InSet(aUpper, Set(0, 1, 2, 3, 4)) :: Nil)
255+
assert(expressionSetTwo.simpleString(1) == "Set(A#1 INSET 0, ... 4 more fields)")
256+
}
243257
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
5252
import org.apache.spark.sql.execution.command._
5353
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, PushedDownOperators}
5454
import org.apache.spark.sql.execution.streaming.StreamingRelation
55+
import org.apache.spark.sql.internal.SQLConf
5556
import org.apache.spark.sql.sources
5657
import org.apache.spark.sql.sources._
5758
import org.apache.spark.sql.types._
@@ -519,9 +520,11 @@ object DataSourceStrategy
519520
ExpressionSet(Nil)
520521
} else {
521522
val partitionSet = AttributeSet(partitionColumns)
523+
val maxToStringFields = SQLConf.get.getConf(SQLConf.MAX_TO_STRING_FIELDS)
522524
val predicates = ExpressionSet(normalizedFilters
523525
.flatMap(extractPredicatesWithinOutputSet(_, partitionSet)))
524-
logInfo(log"Pruning directories with: ${MDC(PREDICATES, predicates.mkString(","))}")
526+
logInfo(log"Pruning directories with: ${MDC(PREDICATES,
527+
predicates.simpleString(maxToStringFields))}")
525528
predicates
526529
}
527530
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3232
import org.apache.spark.sql.catalyst.trees.TreePattern.{PLAN_EXPRESSION, SCALAR_SUBQUERY}
3333
import org.apache.spark.sql.catalyst.types.DataTypeUtils
3434
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
35+
import org.apache.spark.sql.internal.SQLConf
3536
import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
3637
import org.apache.spark.util.ArrayImplicits._
3738
import org.apache.spark.util.collection.BitSet
@@ -199,6 +200,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
199200
Some(f)
200201
}
201202
}
203+
202204
val supportNestedPredicatePushdown =
203205
DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
204206
val pushedFilters = dataFilters
@@ -207,7 +209,9 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
207209

208210
// Predicates with both partition keys and attributes need to be evaluated after the scan.
209211
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
210-
logInfo(log"Post-Scan Filters: ${MDC(POST_SCAN_FILTERS, afterScanFilters.mkString(","))}")
212+
val maxToStringFields = fsRelation.sparkSession.conf.get(SQLConf.MAX_TO_STRING_FIELDS)
213+
logInfo(log"Post-Scan Filters: ${MDC(POST_SCAN_FILTERS,
214+
afterScanFilters.simpleString(maxToStringFields))}")
211215

212216
val filterAttributes = AttributeSet(afterScanFilters ++ stayUpFilters)
213217
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects

0 commit comments

Comments
 (0)