Skip to content

Commit b5e183c

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-28108][SQL][test-hadoop3.2] Simplify OrcFilters
## What changes were proposed in this pull request? In apache#24068, IvanVergiliev fixes the issue that OrcFilters.createBuilder has exponential complexity in the height of the filter tree due to the way the check-and-build pattern is implemented. Comparing to the approach in apache#24068, I propose a simple solution for the issue: 1. separate the logic of building a convertible filter tree and the actual SearchArgument builder, since the two procedures are different and their return types are different. Thus the new introduced class `ActionType`,`TrimUnconvertibleFilters` and `BuildSearchArgument` in apache#24068 can be dropped. The code is more readable. 2. For most of the leaf nodes, the convertible result is always Some(node), we can abstract it like this PR. 3. The code is actually small changes on the previous code. See apache#24783 ## How was this patch tested? Run the benchmark provided in apache#24068: ``` val schema = StructType.fromDDL("col INT") (20 to 30).foreach { width => val whereFilter = (1 to width).map(i => EqualTo("col", i)).reduceLeft(Or) val start = System.currentTimeMillis() OrcFilters.createFilter(schema, Seq(whereFilter)) println(s"With $width filters, conversion takes ${System.currentTimeMillis() - start} ms") } ``` Result: ``` With 20 filters, conversion takes 6 ms With 21 filters, conversion takes 0 ms With 22 filters, conversion takes 0 ms With 23 filters, conversion takes 0 ms With 24 filters, conversion takes 0 ms With 25 filters, conversion takes 0 ms With 26 filters, conversion takes 0 ms With 27 filters, conversion takes 0 ms With 28 filters, conversion takes 0 ms With 29 filters, conversion takes 0 ms With 30 filters, conversion takes 0 ms ``` Also verified with Unit tests. Closes apache#24910 from gengliangwang/refactorOrcFilters. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 7c05f61 commit b5e183c

File tree

3 files changed

+431
-639
lines changed
  • sql

3 files changed

+431
-639
lines changed

sql/core/v1.2.1/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala

Lines changed: 137 additions & 210 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
2323
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
2424
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
2525

26+
import org.apache.spark.SparkException
2627
import org.apache.spark.sql.sources.Filter
2728
import org.apache.spark.sql.types._
2829

@@ -64,26 +65,72 @@ private[sql] object OrcFilters extends OrcFiltersBase {
6465
*/
6566
def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
6667
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
67-
val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
68-
for {
69-
// Combines all filters using `And` to produce a single conjunction
70-
conjunction <- buildTree(filters)
71-
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
72-
builder <- orcFilterConverter.buildSearchArgument(conjunction, newBuilder)
73-
} yield builder.build()
68+
// Combines all convertible filters using `And` to produce a single conjunction
69+
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
70+
conjunctionOptional.map { conjunction =>
71+
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate.
72+
// The input predicate is fully convertible. There should not be any empty result in the
73+
// following recursive method call `buildSearchArgument`.
74+
buildSearchArgument(dataTypeMap, conjunction, newBuilder).build()
75+
}
7476
}
7577

7678
def convertibleFilters(
7779
schema: StructType,
7880
dataTypeMap: Map[String, DataType],
7981
filters: Seq[Filter]): Seq[Filter] = {
80-
val orcFilterConverter = new OrcFilterConverter(dataTypeMap)
81-
filters.flatMap(orcFilterConverter.trimUnconvertibleFilters)
82-
}
82+
import org.apache.spark.sql.sources._
8383

84-
}
84+
def convertibleFiltersHelper(
85+
filter: Filter,
86+
canPartialPushDown: Boolean): Option[Filter] = filter match {
87+
// At here, it is not safe to just convert one side and remove the other side
88+
// if we do not understand what the parent filters are.
89+
//
90+
// Here is an example used to explain the reason.
91+
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
92+
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
93+
// NOT(a = 2), which will generate wrong results.
94+
//
95+
// Pushing one side of AND down is only safe to do at the top level or in the child
96+
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
97+
// can be safely removed.
98+
case And(left, right) =>
99+
val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown)
100+
val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown)
101+
(leftResultOptional, rightResultOptional) match {
102+
case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult))
103+
case (Some(leftResult), None) if canPartialPushDown => Some(leftResult)
104+
case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult)
105+
case _ => None
106+
}
85107

86-
private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) {
108+
// The Or predicate is convertible when both of its children can be pushed down.
109+
// That is to say, if one/both of the children can be partially pushed down, the Or
110+
// predicate can be partially pushed down as well.
111+
//
112+
// Here is an example used to explain the reason.
113+
// Let's say we have
114+
// (a1 AND a2) OR (b1 AND b2),
115+
// a1 and b1 is convertible, while a2 and b2 is not.
116+
// The predicate can be converted as
117+
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
118+
// As per the logical in And predicate, we can push down (a1 OR b1).
119+
case Or(left, right) =>
120+
for {
121+
lhs <- convertibleFiltersHelper(left, canPartialPushDown)
122+
rhs <- convertibleFiltersHelper(right, canPartialPushDown)
123+
} yield Or(lhs, rhs)
124+
case Not(pred) =>
125+
val childResultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false)
126+
childResultOptional.map(Not)
127+
case other =>
128+
for (_ <- buildLeafSearchArgument(dataTypeMap, other, newBuilder())) yield other
129+
}
130+
filters.flatMap { filter =>
131+
convertibleFiltersHelper(filter, true)
132+
}
133+
}
87134

88135
/**
89136
* Get PredicateLeafType which is corresponding to the given DataType.
@@ -115,228 +162,108 @@ private class OrcFilterConverter(val dataTypeMap: Map[String, DataType]) {
115162
case _ => value
116163
}
117164

118-
import org.apache.spark.sql.sources._
119-
import OrcFilters._
120-
121165
/**
122-
* Builds a SearchArgument for a Filter by first trimming the non-convertible nodes, and then
123-
* only building the remaining convertible nodes.
166+
* Build a SearchArgument and return the builder so far.
124167
*
125-
* Doing the conversion in this way avoids the computational complexity problems introduced by
126-
* checking whether a node is convertible while building it. The approach implemented here has
127-
* complexity that's linear in the size of the Filter tree - O(number of Filter nodes) - we run
128-
* a single pass over the tree to trim it, and then another pass on the trimmed tree to convert
129-
* the remaining nodes.
130-
*
131-
* The alternative approach of checking-while-building can (and did) result
132-
* in exponential complexity in the height of the tree, causing perf problems with Filters with
133-
* as few as ~35 nodes if they were skewed.
168+
* @param dataTypeMap a map from the attribute name to its data type.
169+
* @param expression the input predicates, which should be fully convertible to SearchArgument.
170+
* @param builder the input SearchArgument.Builder.
171+
* @return the builder so far.
134172
*/
135-
private[sql] def buildSearchArgument(
173+
private def buildSearchArgument(
174+
dataTypeMap: Map[String, DataType],
136175
expression: Filter,
137-
builder: Builder): Option[Builder] = {
138-
trimUnconvertibleFilters(expression).map { filter =>
139-
updateBuilder(filter, builder)
140-
builder
141-
}
142-
}
143-
144-
/**
145-
* Removes all sub-Filters from a given Filter that are not convertible to an ORC SearchArgument.
146-
*/
147-
private[sql] def trimUnconvertibleFilters(expression: Filter): Option[Filter] = {
148-
performAction(TrimUnconvertibleFilters(canPartialPushDownConjuncts = true), expression)
149-
}
150-
151-
/**
152-
* Builds a SearchArgument for the given Filter. This method should only be called on Filters
153-
* that have previously been trimmed to remove unsupported sub-Filters!
154-
*/
155-
private def updateBuilder(expression: Filter, builder: Builder): Unit =
156-
performAction(BuildSearchArgument(builder), expression)
157-
158-
sealed trait ActionType[ReturnType]
159-
case class TrimUnconvertibleFilters(canPartialPushDownConjuncts: Boolean)
160-
extends ActionType[Option[Filter]]
161-
case class BuildSearchArgument(builder: Builder) extends ActionType[Unit]
162-
163-
// The performAction method can run both the filtering and building operations for a given
164-
// node - we signify which one we want with the `actionType` parameter.
165-
//
166-
// There are a couple of benefits to coupling the two operations like this:
167-
// 1. All the logic for a given predicate is grouped logically in the same place. You don't
168-
// have to scroll across the whole file to see what the filter action for an And is while
169-
// you're looking at the build action.
170-
// 2. It's much easier to keep the implementations of the two operations up-to-date with
171-
// each other. If the `filter` and `build` operations are implemented as separate case-matches
172-
// in different methods, it's very easy to change one without appropriately updating the
173-
// other. For example, if we add a new supported node type to `filter`, it would be very
174-
// easy to forget to update `build` to support it too, thus leading to conversion errors.
175-
private def performAction[ReturnType](
176-
actionType: ActionType[ReturnType],
177-
expression: Filter): ReturnType = {
178-
def getType(attribute: String): PredicateLeaf.Type =
179-
getPredicateLeafType(dataTypeMap(attribute))
176+
builder: Builder): Builder = {
177+
import org.apache.spark.sql.sources._
180178

181179
expression match {
182180
case And(left, right) =>
183-
actionType match {
184-
case t @ TrimUnconvertibleFilters(canPartialPushDownConjuncts) =>
185-
// At here, it is not safe to just keep one side and remove the other side
186-
// if we do not understand what the parent filters are.
187-
//
188-
// Here is an example used to explain the reason.
189-
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
190-
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
191-
// NOT(a = 2), which will generate wrong results.
192-
//
193-
// Pushing one side of AND down is only safe to do at the top level or in the child
194-
// AND before hitting NOT or OR conditions, and in this case, the unsupported
195-
// predicate can be safely removed.
196-
val lhs = performAction(t, left)
197-
val rhs = performAction(t, right)
198-
(lhs, rhs) match {
199-
case (Some(l), Some(r)) => Some(And(l, r))
200-
case (Some(_), None) if canPartialPushDownConjuncts => lhs
201-
case (None, Some(_)) if canPartialPushDownConjuncts => rhs
202-
case _ => None
203-
}
204-
case b @ BuildSearchArgument(builder) =>
205-
builder.startAnd()
206-
performAction(b, left)
207-
performAction(b, right)
208-
builder.end()
209-
()
210-
}
181+
val lhs = buildSearchArgument(dataTypeMap, left, builder.startAnd())
182+
val rhs = buildSearchArgument(dataTypeMap, right, lhs)
183+
rhs.end()
211184

212185
case Or(left, right) =>
213-
actionType match {
214-
case t: TrimUnconvertibleFilters =>
215-
// The Or predicate is convertible when both of its children can be pushed down.
216-
// That is to say, if one/both of the children can be partially pushed down, the Or
217-
// predicate can be partially pushed down as well.
218-
//
219-
// Here is an example used to explain the reason.
220-
// Let's say we have
221-
// (a1 AND a2) OR (b1 AND b2),
222-
// a1 and b1 is convertible, while a2 and b2 is not.
223-
// The predicate can be converted as
224-
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
225-
// As per the logical in And predicate, we can push down (a1 OR b1).
226-
for {
227-
lhs: Filter <- performAction(t, left)
228-
rhs: Filter <- performAction(t, right)
229-
} yield Or(lhs, rhs)
230-
case b @ BuildSearchArgument(builder) =>
231-
builder.startOr()
232-
performAction(b, left)
233-
performAction(b, right)
234-
builder.end()
235-
()
236-
}
186+
val lhs = buildSearchArgument(dataTypeMap, left, builder.startOr())
187+
val rhs = buildSearchArgument(dataTypeMap, right, lhs)
188+
rhs.end()
237189

238190
case Not(child) =>
239-
actionType match {
240-
case t: TrimUnconvertibleFilters =>
241-
performAction(t.copy(canPartialPushDownConjuncts = false), child).map(Not)
242-
case b @ BuildSearchArgument(builder) =>
243-
builder.startNot()
244-
performAction(b, child)
245-
builder.end()
246-
()
191+
buildSearchArgument(dataTypeMap, child, builder.startNot()).end()
192+
193+
case other =>
194+
buildLeafSearchArgument(dataTypeMap, other, builder).getOrElse {
195+
throw new SparkException(
196+
"The input filter of OrcFilters.buildSearchArgument should be fully convertible.")
247197
}
198+
}
199+
}
248200

249-
// NOTE: For all case branches dealing with leaf predicates below, the additional
250-
// `startAnd()` call is mandatory. ORC `SearchArgument` builder requires that all leaf
251-
// predicates must be wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
201+
/**
202+
* Build a SearchArgument for a leaf predicate and return the builder so far.
203+
*
204+
* @param dataTypeMap a map from the attribute name to its data type.
205+
* @param expression the input filter predicates.
206+
* @param builder the input SearchArgument.Builder.
207+
* @return the builder so far.
208+
*/
209+
private def buildLeafSearchArgument(
210+
dataTypeMap: Map[String, DataType],
211+
expression: Filter,
212+
builder: Builder): Option[Builder] = {
213+
def getType(attribute: String): PredicateLeaf.Type =
214+
getPredicateLeafType(dataTypeMap(attribute))
215+
216+
import org.apache.spark.sql.sources._
252217

218+
// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
219+
// call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
220+
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
221+
expression match {
253222
case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
254-
actionType match {
255-
case _: TrimUnconvertibleFilters => Some(expression)
256-
case BuildSearchArgument(builder) =>
257-
val quotedName = quoteAttributeNameIfNeeded(attribute)
258-
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
259-
builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()
260-
()
261-
}
223+
val quotedName = quoteAttributeNameIfNeeded(attribute)
224+
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
225+
Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end())
226+
262227
case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
263-
actionType match {
264-
case _: TrimUnconvertibleFilters => Some(expression)
265-
case BuildSearchArgument(builder) =>
266-
val quotedName = quoteAttributeNameIfNeeded(attribute)
267-
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
268-
builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()
269-
()
270-
}
228+
val quotedName = quoteAttributeNameIfNeeded(attribute)
229+
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
230+
Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end())
231+
271232
case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
272-
actionType match {
273-
case _: TrimUnconvertibleFilters => Some(expression)
274-
case BuildSearchArgument(builder) =>
275-
val quotedName = quoteAttributeNameIfNeeded(attribute)
276-
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
277-
builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()
278-
()
279-
}
233+
val quotedName = quoteAttributeNameIfNeeded(attribute)
234+
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
235+
Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end())
236+
280237
case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
281-
actionType match {
282-
case _: TrimUnconvertibleFilters => Some(expression)
283-
case BuildSearchArgument(builder) =>
284-
val quotedName = quoteAttributeNameIfNeeded(attribute)
285-
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
286-
builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()
287-
()
288-
}
238+
val quotedName = quoteAttributeNameIfNeeded(attribute)
239+
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
240+
Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end())
241+
289242
case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
290-
actionType match {
291-
case _: TrimUnconvertibleFilters => Some(expression)
292-
case BuildSearchArgument(builder) =>
293-
val quotedName = quoteAttributeNameIfNeeded(attribute)
294-
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
295-
builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()
296-
()
297-
}
243+
val quotedName = quoteAttributeNameIfNeeded(attribute)
244+
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
245+
Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end())
246+
298247
case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
299-
actionType match {
300-
case _: TrimUnconvertibleFilters => Some(expression)
301-
case BuildSearchArgument(builder) =>
302-
val quotedName = quoteAttributeNameIfNeeded(attribute)
303-
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
304-
builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()
305-
()
306-
}
248+
val quotedName = quoteAttributeNameIfNeeded(attribute)
249+
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
250+
Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end())
251+
307252
case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
308-
actionType match {
309-
case _: TrimUnconvertibleFilters => Some(expression)
310-
case BuildSearchArgument(builder) =>
311-
val quotedName = quoteAttributeNameIfNeeded(attribute)
312-
builder.startAnd().isNull(quotedName, getType(attribute)).end()
313-
()
314-
}
253+
val quotedName = quoteAttributeNameIfNeeded(attribute)
254+
Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
255+
315256
case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
316-
actionType match {
317-
case _: TrimUnconvertibleFilters => Some(expression)
318-
case BuildSearchArgument(builder) =>
319-
val quotedName = quoteAttributeNameIfNeeded(attribute)
320-
builder.startNot().isNull(quotedName, getType(attribute)).end()
321-
()
322-
}
257+
val quotedName = quoteAttributeNameIfNeeded(attribute)
258+
Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
259+
323260
case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
324-
actionType match {
325-
case _: TrimUnconvertibleFilters => Some(expression)
326-
case BuildSearchArgument(builder) =>
327-
val quotedName = quoteAttributeNameIfNeeded(attribute)
328-
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute)))
329-
builder.startAnd().in(quotedName, getType(attribute),
330-
castedValues.map(_.asInstanceOf[AnyRef]): _*).end()
331-
()
332-
}
261+
val quotedName = quoteAttributeNameIfNeeded(attribute)
262+
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute)))
263+
Some(builder.startAnd().in(quotedName, getType(attribute),
264+
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
333265

334-
case _ =>
335-
actionType match {
336-
case _: TrimUnconvertibleFilters => None
337-
case BuildSearchArgument(builder) =>
338-
throw new IllegalArgumentException(s"Can't build unsupported filter ${expression}")
339-
}
266+
case _ => None
340267
}
341268
}
342269
}

0 commit comments

Comments
 (0)