Skip to content

Commit 1b08c43

Browse files
maropuhvanhovell
authored andcommitted
[SPARK-23584][SQL] NewInstance should support interpreted execution
## What changes were proposed in this pull request? This pr supported interpreted mode for `NewInstance`. ## How was this patch tested? Added tests in `ObjectExpressionsSuite`. Author: Takeshi Yamamuro <[email protected]> Closes apache#20778 from maropu/SPARK-23584.
1 parent 46bb2b5 commit 1b08c43

File tree

3 files changed

+75
-2
lines changed

3 files changed

+75
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import java.lang.reflect.Constructor
21+
22+
import org.apache.commons.lang3.reflect.ConstructorUtils
23+
2024
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue}
2125
import org.apache.spark.sql.catalyst.expressions._
2226
import org.apache.spark.sql.catalyst.expressions.objects._
@@ -781,6 +785,15 @@ object ScalaReflection extends ScalaReflection {
781785
}
782786
}
783787

788+
/**
789+
* Finds an accessible constructor with compatible parameters. This is a more flexible search
790+
* than the exact matching algorithm in `Class.getConstructor`. The first assignment-compatible
791+
* matching constructor is returned. Otherwise, it returns `None`.
792+
*/
793+
def findConstructor(cls: Class[_], paramTypes: Seq[Class[_]]): Option[Constructor[_]] = {
794+
Option(ConstructorUtils.getMatchingAccessibleConstructor(cls, paramTypes: _*))
795+
}
796+
784797
/**
785798
* Whether the fields of the given type is defined entirely by its constructor parameters.
786799
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,8 +449,32 @@ case class NewInstance(
449449
childrenResolved && !needOuterPointer
450450
}
451451

452-
override def eval(input: InternalRow): Any =
453-
throw new UnsupportedOperationException("Only code-generated evaluation is supported.")
452+
@transient private lazy val constructor: (Seq[AnyRef]) => Any = {
453+
val paramTypes = ScalaReflection.expressionJavaClasses(arguments)
454+
val getConstructor = (paramClazz: Seq[Class[_]]) => {
455+
ScalaReflection.findConstructor(cls, paramClazz).getOrElse {
456+
sys.error(s"Couldn't find a valid constructor on $cls")
457+
}
458+
}
459+
outerPointer.map { p =>
460+
val outerObj = p()
461+
val d = outerObj.getClass +: paramTypes
462+
val c = getConstructor(outerObj.getClass +: paramTypes)
463+
(args: Seq[AnyRef]) => {
464+
c.newInstance(outerObj +: args: _*)
465+
}
466+
}.getOrElse {
467+
val c = getConstructor(paramTypes)
468+
(args: Seq[AnyRef]) => {
469+
c.newInstance(args: _*)
470+
}
471+
}
472+
}
473+
474+
override def eval(input: InternalRow): Any = {
475+
val argValues = arguments.map(_.eval(input))
476+
constructor(argValues.map(_.asInstanceOf[AnyRef]))
477+
}
454478

455479
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
456480
val javaType = CodeGenerator.javaType(dataType)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ class InvokeTargetSubClass extends InvokeTargetClass {
4747
override def binOp(e1: Int, e2: Double): Double = e1 - e2
4848
}
4949

50+
// Tests for NewInstance
51+
class Outer extends Serializable {
52+
class Inner(val value: Int) {
53+
override def hashCode(): Int = super.hashCode()
54+
override def equals(other: Any): Boolean = {
55+
if (other.isInstanceOf[Inner]) {
56+
value == other.asInstanceOf[Inner].value
57+
} else {
58+
false
59+
}
60+
}
61+
}
62+
}
63+
5064
class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
5165

5266
test("SPARK-16622: The returned value of the called method in Invoke can be null") {
@@ -383,6 +397,27 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
383397
}
384398
}
385399

400+
test("SPARK-23584 NewInstance should support interpreted execution") {
401+
// Normal case test
402+
val newInst1 = NewInstance(
403+
cls = classOf[GenericArrayData],
404+
arguments = Literal.fromObject(List(1, 2, 3)) :: Nil,
405+
propagateNull = false,
406+
dataType = ArrayType(IntegerType),
407+
outerPointer = None)
408+
checkObjectExprEvaluation(newInst1, new GenericArrayData(List(1, 2, 3)))
409+
410+
// Inner class case test
411+
val outerObj = new Outer()
412+
val newInst2 = NewInstance(
413+
cls = classOf[outerObj.Inner],
414+
arguments = Literal(1) :: Nil,
415+
propagateNull = false,
416+
dataType = ObjectType(classOf[outerObj.Inner]),
417+
outerPointer = Some(() => outerObj))
418+
checkObjectExprEvaluation(newInst2, new outerObj.Inner(1))
419+
}
420+
386421
test("LambdaVariable should support interpreted execution") {
387422
def genSchema(dt: DataType): Seq[StructType] = {
388423
Seq(StructType(StructField("col_1", dt, nullable = false) :: Nil),
@@ -421,6 +456,7 @@ class TestBean extends Serializable {
421456
private var x: Int = 0
422457

423458
def setX(i: Int): Unit = x = i
459+
424460
def setNonPrimitive(i: AnyRef): Unit =
425461
assert(i != null, "this setter should not be called with null.")
426462
}

0 commit comments

Comments
 (0)