1717
1818package org .apache .spark .sql .catalyst .analysis
1919
20+ import org .apache .spark .SparkException
2021import org .apache .spark .sql .AnalysisException
2122import org .apache .spark .sql .catalyst .{FunctionIdentifier , InternalRow , TableIdentifier }
2223import org .apache .spark .sql .catalyst .expressions ._
@@ -358,20 +359,23 @@ abstract class Star extends LeafExpression with NamedExpression {
358359 def expand (input : LogicalPlan , resolver : Resolver ): Seq [NamedExpression ]
359360}
360361
361-
362362/**
363363 * Represents all of the input attributes to a given relational operator, for example in
364364 * "SELECT * FROM ...".
365+ * "SELECT * FROM ..." or "SELECT * EXCEPT(...) FROM ..."
365366 *
366367 * This is also used to expand structs. For example:
367368 * "SELECT record.* from (SELECT struct(a,b,c) as record ...)
368369 *
369370 * @param target an optional name that should be the target of the expansion. If omitted all
370371 * targets' columns are produced. This can either be a table name or struct name. This
371372 * is a list of identifiers that is the path of the expansion.
372- */
373- case class UnresolvedStar (target : Option [Seq [String ]]) extends Star with Unevaluable {
374-
373+ *
374+ * This class provides the shared behavior between the classes for SELECT * ([[UnresolvedStar ]])
375+ * and SELECT * EXCEPT ([[UnresolvedStarExcept ]]). [[UnresolvedStar ]] is just a case class of this,
376+ * while [[UnresolvedStarExcept ]] adds some additional logic to the expand method.
377+ */
378+ abstract class UnresolvedStarBase (target : Option [Seq [String ]]) extends Star with Unevaluable {
375379 /**
376380 * Returns true if the nameParts is a subset of the last elements of qualifier of the attribute.
377381 *
@@ -383,7 +387,7 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
383387 * - `SELECT t.* FROM ns1.ns2.t` where nameParts is Seq("t") and
384388 * qualifier is Seq("ns1", "ns2", "t").
385389 */
386- private def matchedQualifier (
390+ protected def matchedQualifier (
387391 attribute : Attribute ,
388392 nameParts : Seq [String ],
389393 resolver : Resolver ): Boolean = {
@@ -444,6 +448,148 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
444448 override def toString : String = target.map(_.mkString(" " , " ." , " ." )).getOrElse(" " ) + " *"
445449}
446450
451+ /**
452+ * Represents some of the input attributes to a given relational operator, for example in
453+ * "SELECT * EXCEPT(a) FROM ...".
454+ *
455+ * @param target an optional name that should be the target of the expansion. If omitted all
456+ * targets' columns are produced. This can only be a table name. This
457+ * is a list of identifiers that is the path of the expansion.
458+ *
459+ * @param excepts a list of names that should be excluded from the expansion.
460+ *
461+ */
462+ case class UnresolvedStarExcept (target : Option [Seq [String ]], excepts : Seq [Seq [String ]])
463+ extends UnresolvedStarBase (target) {
464+
465+ /**
466+ * We expand the * EXCEPT by the following three steps:
467+ * 1. use the original .expand() to get top-level column list or struct expansion
468+ * 2. resolve excepts (with respect to the Seq[NamedExpression] returned from (1))
469+ * 3. filter the expanded columns with the resolved except list. recursively apply filtering in
470+ * case of nested columns in the except list (in order to rewrite structs)
471+ */
472+ override def expand (input : LogicalPlan , resolver : Resolver ): Seq [NamedExpression ] = {
473+ // Use the UnresolvedStarBase expand method to get a seq of NamedExpressions corresponding to
474+ // the star expansion. This will yield a list of top-level columns from the logical plan's
475+ // output, or in the case of struct expansion (e.g. target=`x` for SELECT x.*) it will give
476+ // a seq of Alias wrapping the struct field extraction.
477+ val expandedCols = super .expand(input, resolver)
478+
479+ // resolve except list with respect to the expandedCols
480+ val resolvedExcepts = excepts.map { exceptParts =>
481+ AttributeSeq (expandedCols.map(_.toAttribute)).resolve(exceptParts, resolver).getOrElse {
482+ val orderedCandidates = StringUtils .orderSuggestedIdentifiersBySimilarity(
483+ UnresolvedAttribute (exceptParts).name, expandedCols.map(a => a.qualifier :+ a.name))
484+ // if target is defined and expandedCols does not include any Attributes, it must be struct
485+ // expansion; give message suggesting to use unqualified names of nested fields.
486+ throw QueryCompilationErrors
487+ .unresolvedColumnError(UnresolvedAttribute (exceptParts).name, orderedCandidates)
488+ }
489+ }
490+
491+ // Convert each resolved except into a pair of (col: Attribute, nestedColumn) representing the
492+ // top level column in expandedCols that we must 'filter' based on nestedColumn.
493+ @ scala.annotation.tailrec
494+ def getRootColumn (expr : Expression , nestedColumn : Seq [String ] = Nil )
495+ : (NamedExpression , Seq [String ]) = expr match {
496+ case GetStructField (fieldExpr, _, Some (fieldName)) =>
497+ getRootColumn(fieldExpr, fieldName +: nestedColumn)
498+ case e : NamedExpression => e -> nestedColumn
499+ case other : ExtractValue => throw new AnalysisException (
500+ errorClass = " EXCEPT_NESTED_COLUMN_INVALID_TYPE" ,
501+ messageParameters = Map (" columnName" -> other.sql, " dataType" -> other.dataType.toString))
502+ }
503+ // An exceptPair represents the column in expandedCols along with the path of a nestedColumn
504+ // that should be except-ed. Consider two examples:
505+ // 1. excepting the entire col1 = (col1, Seq())
506+ // 2. excepting a nested field in col2, col2.a.b = (col2, Seq(a, b))
507+ // INVARIANT: we rely on the structure of the resolved except being an Alias of GetStructField
508+ // in the case of nested columns.
509+ val exceptPairs = resolvedExcepts.map {
510+ case Alias (exceptExpr, name) => getRootColumn(exceptExpr)
511+ case except : NamedExpression => except -> Seq .empty
512+ }
513+
514+ // Filter columns which correspond to ones listed in the except list and return a new list of
515+ // columns which exclude the columns in the except list. The 'filtering' manifests as either
516+ // dropping the column from the list of columns we return, or rewriting the projected column in
517+ // order to remove excepts that refer to nested columns. For example, with the example above:
518+ // excepts = Seq(
519+ // (col1, Seq()), => filter col1 from the output
520+ // (col2, Seq(a, b)) => rewrite col2 in the output so that it doesn't include the nested field
521+ // ) corresponding to col2.a.b
522+ //
523+ // This occurs in two steps:
524+ // 1. group the excepts by the column they refer to (groupedExcepts)
525+ // 2. filter/rewrite input columns based on four cases:
526+ // a. column doesn't match any groupedExcepts => column unchanged
527+ // b. column exists in groupedExcepts and:
528+ // i. none of remainingExcepts are empty => recursively apply filterColumns over the
529+ // struct fields in order to rewrite the struct
530+ // ii. a remainingExcept is empty, but there are multiple remainingExcepts => we must
531+ // have duplicate/overlapping excepts - throw an error
532+ // iii. [otherwise] remainingExcept is exactly Seq(Seq()) => this is the base 'filtering'
533+ // case. we omit the column from the output (this is a column we would like to
534+ // except). NOTE: this case isn't explicitly listed in the `collect` below since we
535+ // 'collect' columns which match the cases above and omit ones that fall into this
536+ // remaining case.
537+ def filterColumns (columns : Seq [NamedExpression ], excepts : Seq [(NamedExpression , Seq [String ])])
538+ : Seq [NamedExpression ] = {
539+ // group the except pairs by the column they refer to. NOTE: no groupMap until scala 2.13
540+ val groupedExcepts : AttributeMap [Seq [Seq [String ]]] =
541+ AttributeMap (excepts.groupBy(_._1.toAttribute).view.mapValues(v => v.map(_._2)))
542+
543+ // map input columns while searching for the except entry corresponding to the current column
544+ columns.map(col => col -> groupedExcepts.get(col.toAttribute)).collect {
545+ // pass through columns that don't match anything in groupedExcepts
546+ case (col, None ) => col
547+ // found a match but nestedExcepts has remaining excepts - recurse to rewrite the struct
548+ case (col, Some (nestedExcepts)) if nestedExcepts.forall(_.nonEmpty) =>
549+ val fields = col.dataType match {
550+ case s : StructType => s.fields
551+ // we shouldn't be here since we EXCEPT_NEXTED_COLUMN_INVALID_TYPE in getRootColumn
552+ // for this case - just throw internal error
553+ case _ => throw SparkException .internalError(" Invalid column type" )
554+ }
555+ val extractedFields = fields.zipWithIndex.map { case (f, i) =>
556+ Alias (GetStructField (col, i), f.name)()
557+ }
558+ val newExcepts = nestedExcepts.map { nestedExcept =>
559+ // INVARIANT: we cannot have duplicate column names in nested columns, thus, this `head`
560+ // will find the one and only column corresponding to the correct extractedField.
561+ extractedFields.collectFirst { case col if resolver(col.name, nestedExcept.head) =>
562+ col.toAttribute -> nestedExcept.tail
563+ }.get
564+ }
565+ Alias (CreateStruct (filterColumns(extractedFields.toSeq, newExcepts)), col.name)()
566+ // if there are multiple nestedExcepts but one is empty we must have overlapping except
567+ // columns. throw an error.
568+ case (col, Some (nestedExcepts)) if nestedExcepts.size > 1 =>
569+ throw new AnalysisException (
570+ errorClass = " EXCEPT_OVERLAPPING_COLUMNS" ,
571+ messageParameters = Map (
572+ " columns" -> this .excepts.map(_.mkString(" ." )).mkString(" , " )))
573+ }
574+ }
575+
576+ filterColumns(expandedCols, exceptPairs)
577+ }
578+ }
579+
580+ /**
581+ * Represents all of the input attributes to a given relational operator, for example in
582+ * "SELECT * FROM ...".
583+ *
584+ * This is also used to expand structs. For example:
585+ * "SELECT record.* from (SELECT struct(a,b,c) as record ...)
586+ *
587+ * @param target an optional name that should be the target of the expansion. If omitted all
588+ * targets' columns are produced. This can either be a table name or struct name. This
589+ * is a list of identifiers that is the path of the expansion.
590+ */
591+ case class UnresolvedStar (target : Option [Seq [String ]]) extends UnresolvedStarBase (target)
592+
447593/**
448594 * Represents all of the input attributes to a given relational operator, for example in
449595 * "SELECT `(id)?+.+` FROM ...".
0 commit comments