Skip to content

Commit ee9d56e

Browse files
viiryaRobert Kruszewski
authored andcommitted
[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression
## What changes were proposed in this pull request? Add interpreted execution for `InitializeJavaBean` expression. ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <[email protected]> Closes apache#20985 from viirya/SPARK-23593-2.
1 parent 50cfc33 commit ee9d56e

File tree

3 files changed

+96
-6
lines changed

3 files changed

+96
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,8 +1420,45 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
14201420
override def children: Seq[Expression] = beanInstance +: setters.values.toSeq
14211421
override def dataType: DataType = beanInstance.dataType
14221422

1423-
override def eval(input: InternalRow): Any =
1424-
throw new UnsupportedOperationException("Only code-generated evaluation is supported.")
1423+
private lazy val resolvedSetters = {
1424+
assert(beanInstance.dataType.isInstanceOf[ObjectType])
1425+
1426+
val ObjectType(beanClass) = beanInstance.dataType
1427+
setters.map {
1428+
case (name, expr) =>
1429+
// Looking for known type mapping.
1430+
// But also looking for general `Object`-type parameter for generic methods.
1431+
val paramTypes = ScalaReflection.expressionJavaClasses(Seq(expr)) ++ Seq(classOf[Object])
1432+
val methods = paramTypes.flatMap { fieldClass =>
1433+
try {
1434+
Some(beanClass.getDeclaredMethod(name, fieldClass))
1435+
} catch {
1436+
case e: NoSuchMethodException => None
1437+
}
1438+
}
1439+
if (methods.isEmpty) {
1440+
throw new NoSuchMethodException(s"""A method named "$name" is not declared """ +
1441+
"in any enclosing class nor any supertype")
1442+
}
1443+
methods.head -> expr
1444+
}
1445+
}
1446+
1447+
override def eval(input: InternalRow): Any = {
1448+
val instance = beanInstance.eval(input)
1449+
if (instance != null) {
1450+
val bean = instance.asInstanceOf[Object]
1451+
resolvedSetters.foreach {
1452+
case (setter, expr) =>
1453+
val paramVal = expr.eval(input)
1454+
// We don't call setter if input value is null.
1455+
if (paramVal != null) {
1456+
setter.invoke(bean, paramVal.asInstanceOf[AnyRef])
1457+
}
1458+
}
1459+
}
1460+
instance
1461+
}
14251462

14261463
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
14271464
val instanceGen = beanInstance.genCode(ctx)
@@ -1434,7 +1471,9 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
14341471
val fieldGen = fieldValue.genCode(ctx)
14351472
s"""
14361473
|${fieldGen.code}
1437-
|$javaBeanInstance.$setterMethod(${fieldGen.value});
1474+
|if (!${fieldGen.isNull}) {
1475+
| $javaBeanInstance.$setterMethod(${fieldGen.value});
1476+
|}
14381477
""".stripMargin
14391478
}
14401479
val initializeCode = ctx.splitExpressionsWithCurrentInputs(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
5555

5656
protected def checkEvaluation(
5757
expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
58-
val expr = prepareEvaluation(expression)
58+
// Make it as method to obtain fresh expression everytime.
59+
def expr = prepareEvaluation(expression)
5960
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
6061
checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
6162
checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
@@ -111,12 +112,14 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
111112
val errMsg = intercept[T] {
112113
eval
113114
}.getMessage
114-
if (errMsg != expectedErrMsg) {
115+
if (!errMsg.contains(expectedErrMsg)) {
115116
fail(s"Expected error message is `$expectedErrMsg`, but `$errMsg` found")
116117
}
117118
}
118119
}
119-
val expr = prepareEvaluation(expression)
120+
121+
// Make it as method to obtain fresh expression everytime.
122+
def expr = prepareEvaluation(expression)
120123
checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode")
121124
checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode")
122125
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,46 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
192192
Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25))
193193
}
194194

195+
test("SPARK-23593: InitializeJavaBean should support interpreted execution") {
196+
val list = new java.util.LinkedList[Int]()
197+
list.add(1)
198+
199+
val initializeBean = InitializeJavaBean(Literal.fromObject(new java.util.LinkedList[Int]),
200+
Map("add" -> Literal(1)))
201+
checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq()))
202+
203+
val initializeWithNonexistingMethod = InitializeJavaBean(
204+
Literal.fromObject(new java.util.LinkedList[Int]),
205+
Map("nonexisting" -> Literal(1)))
206+
checkExceptionInExpression[Exception](initializeWithNonexistingMethod,
207+
InternalRow.fromSeq(Seq()),
208+
"""A method named "nonexisting" is not declared in any enclosing class """ +
209+
"nor any supertype")
210+
211+
val initializeWithWrongParamType = InitializeJavaBean(
212+
Literal.fromObject(new TestBean),
213+
Map("setX" -> Literal("1")))
214+
intercept[Exception] {
215+
evaluateWithoutCodegen(initializeWithWrongParamType, InternalRow.fromSeq(Seq()))
216+
}.getMessage.contains(
217+
"""A method named "setX" is not declared in any enclosing class """ +
218+
"nor any supertype")
219+
}
220+
221+
test("InitializeJavaBean doesn't call setters if input in null") {
222+
val initializeBean = InitializeJavaBean(
223+
Literal.fromObject(new TestBean),
224+
Map("setNonPrimitive" -> Literal(null)))
225+
evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq()))
226+
evaluateWithGeneratedMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
227+
228+
val initializeBean2 = InitializeJavaBean(
229+
Literal.fromObject(new TestBean),
230+
Map("setNonPrimitive" -> Literal("string")))
231+
evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq()))
232+
evaluateWithGeneratedMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
233+
}
234+
195235
test("SPARK-23585: UnwrapOption should support interpreted execution") {
196236
val cls = classOf[Option[Int]]
197237
val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
@@ -342,3 +382,11 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
342382
}
343383
}
344384
}
385+
386+
class TestBean extends Serializable {
387+
private var x: Int = 0
388+
389+
def setX(i: Int): Unit = x = i
390+
def setNonPrimitive(i: AnyRef): Unit =
391+
assert(i != null, "this setter should not be called with null.")
392+
}

0 commit comments

Comments
 (0)