Skip to content

Commit 63ea877

Browse files
viiryaRobert Kruszewski
authored andcommitted
[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression
## What changes were proposed in this pull request? Add interpreted execution for `InitializeJavaBean` expression. ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <[email protected]> Closes apache#20756 from viirya/SPARK-23593.
1 parent ff98d78 commit 63ea877

File tree

3 files changed

+103
-5
lines changed

3 files changed

+103
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,8 +1410,47 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
14101410
override def children: Seq[Expression] = beanInstance +: setters.values.toSeq
14111411
override def dataType: DataType = beanInstance.dataType
14121412

1413-
override def eval(input: InternalRow): Any =
1414-
throw new UnsupportedOperationException("Only code-generated evaluation is supported.")
1413+
private lazy val resolvedSetters = {
1414+
assert(beanInstance.dataType.isInstanceOf[ObjectType])
1415+
1416+
val ObjectType(beanClass) = beanInstance.dataType
1417+
setters.map {
1418+
case (name, expr) =>
1419+
// Looking for known type mapping.
1420+
// But also looking for general `Object`-type parameter for generic methods.
1421+
val paramTypes = ScalaReflection.expressionJavaClasses(Seq(expr)) ++ Seq(classOf[Object])
1422+
val methods = paramTypes.flatMap { fieldClass =>
1423+
try {
1424+
Some(beanClass.getDeclaredMethod(name, fieldClass))
1425+
} catch {
1426+
case e: NoSuchMethodException => None
1427+
}
1428+
}
1429+
if (methods.isEmpty) {
1430+
throw new NoSuchMethodException(s"""A method named "$name" is not declared """ +
1431+
"in any enclosing class nor any supertype")
1432+
}
1433+
methods.head -> expr
1434+
}
1435+
}
1436+
1437+
override def eval(input: InternalRow): Any = {
1438+
val instance = beanInstance.eval(input)
1439+
if (instance != null) {
1440+
val bean = instance.asInstanceOf[Object]
1441+
resolvedSetters.foreach {
1442+
case (setter, expr) =>
1443+
val paramVal = expr.eval(input)
1444+
if (paramVal == null) {
1445+
throw new NullPointerException("The parameter value for setters in " +
1446+
"`InitializeJavaBean` can not be null")
1447+
} else {
1448+
setter.invoke(bean, paramVal.asInstanceOf[AnyRef])
1449+
}
1450+
}
1451+
}
1452+
instance
1453+
}
14151454

14161455
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
14171456
val instanceGen = beanInstance.genCode(ctx)
@@ -1424,6 +1463,10 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
14241463
val fieldGen = fieldValue.genCode(ctx)
14251464
s"""
14261465
|${fieldGen.code}
1466+
|if (${fieldGen.isNull}) {
1467+
| throw new NullPointerException("The parameter value for setters in " +
1468+
| "`InitializeJavaBean` can not be null");
1469+
|}
14271470
|$javaBeanInstance.$setterMethod(${fieldGen.value});
14281471
""".stripMargin
14291472
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
5555

5656
protected def checkEvaluation(
5757
expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
58-
val expr = prepareEvaluation(expression)
58+
// Make it as method to obtain fresh expression everytime.
59+
def expr = prepareEvaluation(expression)
5960
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
6061
checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
6162
checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
@@ -111,12 +112,14 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
111112
val errMsg = intercept[T] {
112113
eval
113114
}.getMessage
114-
if (errMsg != expectedErrMsg) {
115+
if (!errMsg.contains(expectedErrMsg)) {
115116
fail(s"Expected error message is `$expectedErrMsg`, but `$errMsg` found")
116117
}
117118
}
118119
}
119-
val expr = prepareEvaluation(expression)
120+
121+
// Make it as method to obtain fresh expression everytime.
122+
def expr = prepareEvaluation(expression)
120123
checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode")
121124
checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode")
122125
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,50 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
128128
Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25))
129129
}
130130

131+
test("SPARK-23593: InitializeJavaBean should support interpreted execution") {
132+
val list = new java.util.LinkedList[Int]()
133+
list.add(1)
134+
135+
val initializeBean = InitializeJavaBean(Literal.fromObject(new java.util.LinkedList[Int]),
136+
Map("add" -> Literal(1)))
137+
checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq()))
138+
139+
val initializeWithNonexistingMethod = InitializeJavaBean(
140+
Literal.fromObject(new java.util.LinkedList[Int]),
141+
Map("nonexisting" -> Literal(1)))
142+
checkExceptionInExpression[Exception](initializeWithNonexistingMethod,
143+
InternalRow.fromSeq(Seq()),
144+
"""A method named "nonexisting" is not declared in any enclosing class """ +
145+
"nor any supertype")
146+
147+
val initializeWithWrongParamType = InitializeJavaBean(
148+
Literal.fromObject(new TestBean),
149+
Map("setX" -> Literal("1")))
150+
intercept[Exception] {
151+
evaluateWithoutCodegen(initializeWithWrongParamType, InternalRow.fromSeq(Seq()))
152+
}.getMessage.contains(
153+
"""A method named "setX" is not declared in any enclosing class """ +
154+
"nor any supertype")
155+
}
156+
157+
test("Can not pass in null into setters in InitializeJavaBean") {
158+
val initializeBean = InitializeJavaBean(
159+
Literal.fromObject(new TestBean),
160+
Map("setNonPrimitive" -> Literal(null)))
161+
intercept[NullPointerException] {
162+
evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq()))
163+
}.getMessage.contains("The parameter value for setters in `InitializeJavaBean` can not be null")
164+
intercept[NullPointerException] {
165+
evaluateWithGeneratedMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
166+
}.getMessage.contains("The parameter value for setters in `InitializeJavaBean` can not be null")
167+
168+
val initializeBean2 = InitializeJavaBean(
169+
Literal.fromObject(new TestBean),
170+
Map("setNonPrimitive" -> Literal("string")))
171+
evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq()))
172+
evaluateWithGeneratedMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
173+
}
174+
131175
test("SPARK-23585: UnwrapOption should support interpreted execution") {
132176
val cls = classOf[Option[Int]]
133177
val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
@@ -278,3 +322,11 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
278322
}
279323
}
280324
}
325+
326+
class TestBean extends Serializable {
327+
private var x: Int = 0
328+
329+
def setX(i: Int): Unit = x = i
330+
def setNonPrimitive(i: AnyRef): Unit =
331+
assert(i != null, "this setter should not be called with null.")
332+
}

0 commit comments

Comments
 (0)