11package org .apache .spark .sql .rf
22
33import java .lang .reflect .Constructor
4-
5- import org .apache .spark .sql .AnalysisException
6- import org .apache .spark .sql .catalyst .FunctionIdentifier
7- import org .apache .spark .sql .catalyst .analysis .FunctionRegistry
8- import org .apache .spark .sql .catalyst .analysis .FunctionRegistry .FunctionBuilder
4+ import org .apache .spark .sql .catalyst .analysis .{FunctionRegistry , FunctionRegistryBase }
5+ import org .apache .spark .sql .catalyst .analysis .FunctionRegistry .{FUNC_ALIAS , FunctionBuilder }
96import org .apache .spark .sql .catalyst .catalog .CatalogTable
107import org .apache .spark .sql .catalyst .expressions .objects .{Invoke , InvokeLike }
11- import org .apache .spark .sql .catalyst .expressions .{AttributeReference , Expression , ExpressionDescription , ExpressionInfo }
8+ import org .apache .spark .sql .catalyst .expressions .{AttributeReference , Expression , ExpressionInfo }
129import org .apache .spark .sql .catalyst .plans .logical .LogicalPlan
1310import org .apache .spark .sql .execution .datasources .LogicalRelation
1411import org .apache .spark .sql .sources .BaseRelation
1512import org .apache .spark .sql .types .DataType
1613
1714import scala .reflect ._
18- import scala .util .{Failure , Success , Try }
1915
2016/**
2117 * Collection of Spark version compatibility adapters.
@@ -27,18 +23,6 @@ object VersionShims {
2723 val lrClazz = classOf [LogicalRelation ]
2824 val ctor = lrClazz.getConstructors.head.asInstanceOf [Constructor [LogicalRelation ]]
2925 ctor.getParameterTypes.length match {
30- // In Spark 2.1.0 the signature looks like this:
31- //
32- // case class LogicalRelation(
33- // relation: BaseRelation,
34- // expectedOutputAttributes: Option[Seq[Attribute]] = None,
35- // catalogTable: Option[CatalogTable] = None)
36- // extends LeafNode with MultiInstanceRelation
37- // In Spark 2.2.0 it's like this:
38- // case class LogicalRelation(
39- // relation: BaseRelation,
40- // output: Seq[AttributeReference],
41- // catalogTable: Option[CatalogTable])
4226 case 3 =>
4327 val arg2 : Seq [AttributeReference ] = lr.output
4428 val arg3 : Option [CatalogTable ] = lr.catalogTable
@@ -49,14 +33,6 @@ object VersionShims {
4933 ctor.newInstance(base, arg2, arg3)
5034 }
5135
52- // In Spark 2.3.0 this signature is this:
53- //
54- // case class LogicalRelation(
55- // relation: BaseRelation,
56- // output: Seq[AttributeReference],
57- // catalogTable: Option[CatalogTable],
58- // override val isStreaming: Boolean)
59- // extends LeafNode with MultiInstanceRelation {
6036 case 4 =>
6137 val arg2 : Seq [AttributeReference ] = lr.output
6238 val arg3 : Option [CatalogTable ] = lr.catalogTable
@@ -75,25 +51,8 @@ object VersionShims {
7551 val ctor = classOf [Invoke ].getConstructors.head
7652 val TRUE = Boolean .box(true )
7753 ctor.getParameterTypes.length match {
78- // In Spark 2.1.0 the signature looks like this:
79- //
80- // case class Invoke(
81- // targetObject: Expression,
82- // functionName: String,
83- // dataType: DataType,
84- // arguments: Seq[Expression] = Nil,
85- // propagateNull: Boolean = true) extends InvokeLike
8654 case 5 =>
8755 ctor.newInstance(targetObject, functionName, dataType, Nil , TRUE ).asInstanceOf [InvokeLike ]
88- // In spark 2.2.0 the signature looks like this:
89- //
90- // case class Invoke(
91- // targetObject: Expression,
92- // functionName: String,
93- // dataType: DataType,
94- // arguments: Seq[Expression] = Nil,
95- // propagateNull: Boolean = true,
96- // returnNullable : Boolean = true) extends InvokeLike
9756 case 6 =>
9857 ctor.newInstance(targetObject, functionName, dataType, Nil , TRUE , TRUE ).asInstanceOf [InvokeLike ]
9958
@@ -125,68 +84,18 @@ object VersionShims {
12584 }
12685 }
12786
128- // Much of the code herein is copied from org.apache.spark.sql.catalyst.analysis.FunctionRegistry
129- def registerExpression [T <: Expression : ClassTag ](name : String ): Unit = {
130- val clazz = classTag[T ].runtimeClass
131-
132- def expressionInfo : ExpressionInfo = {
133- val df = clazz.getAnnotation(classOf [ExpressionDescription ])
134- if (df != null ) {
135- if (df.extended().isEmpty) {
136- new ExpressionInfo (clazz.getCanonicalName, null , name, df.usage(), df.arguments(), df.examples(), df.note(), df.group(), df.since(), df.deprecated())
137- } else {
138- // This exists for the backward compatibility with old `ExpressionDescription`s defining
139- // the extended description in `extended()`.
140- new ExpressionInfo (clazz.getCanonicalName, null , name, df.usage(), df.extended())
141- }
142- } else {
143- new ExpressionInfo (clazz.getCanonicalName, name)
144- }
87+ def registerExpression [T <: Expression : ClassTag ](
88+ name : String ,
89+ setAlias : Boolean = false ,
90+ since : Option [String ] = None
91+ ): (String , (ExpressionInfo , FunctionBuilder )) = {
92+ val (expressionInfo, builder) = FunctionRegistryBase .build[T ](name, since)
93+ val newBuilder = (expressions : Seq [Expression ]) => {
94+ val expr = builder(expressions)
95+ if (setAlias) expr.setTagValue(FUNC_ALIAS , name)
96+ expr
14597 }
146- def findBuilder : FunctionBuilder = {
147- val constructors = clazz.getConstructors
148- // See if we can find a constructor that accepts Seq[Expression]
149- val varargCtor = constructors.find(_.getParameterTypes.toSeq == Seq (classOf [Seq [_]]))
150- val builder = (expressions : Seq [Expression ]) => {
151- if (varargCtor.isDefined) {
152- // If there is an apply method that accepts Seq[Expression], use that one.
153- Try (varargCtor.get.newInstance(expressions).asInstanceOf [Expression ]) match {
154- case Success (e) => e
155- case Failure (e) =>
156- // the exception is an invocation exception. To get a meaningful message, we need the
157- // cause.
158- throw new AnalysisException (e.getCause.getMessage)
159- }
160- } else {
161- // Otherwise, find a constructor method that matches the number of arguments, and use that.
162- val params = Seq .fill(expressions.size)(classOf [Expression ])
163- val f = constructors.find(_.getParameterTypes.toSeq == params).getOrElse {
164- val validParametersCount = constructors
165- .filter(_.getParameterTypes.forall(_ == classOf [Expression ]))
166- .map(_.getParameterCount).distinct.sorted
167- val expectedNumberOfParameters = if (validParametersCount.length == 1 ) {
168- validParametersCount.head.toString
169- } else {
170- validParametersCount.init.mkString(" one of " , " , " , " and " ) +
171- validParametersCount.last
172- }
173- throw new AnalysisException (s " Invalid number of arguments for function ${clazz.getSimpleName}. " +
174- s " Expected: $expectedNumberOfParameters; Found: ${params.length}" )
175- }
176- Try (f.newInstance(expressions : _* ).asInstanceOf [Expression ]) match {
177- case Success (e) => e
178- case Failure (e) =>
179- // the exception is an invocation exception. To get a meaningful message, we need the
180- // cause.
181- throw new AnalysisException (e.getCause.getMessage)
182- }
183- }
184- }
185-
186- builder
187- }
188-
189- registry.registerFunction(FunctionIdentifier (name), expressionInfo, findBuilder)
98+ (name, (expressionInfo, newBuilder))
19099 }
191100 }
192101}
0 commit comments