Skip to content

Commit e134165

Browse files
maropuhvanhovell
authored andcommitted
[SPARK-23588][SQL] CatalystToExternalMap should support interpreted execution
## What changes were proposed in this pull request? This pr supported interpreted mode for `CatalystToExternalMap`. ## How was this patch tested? Added tests in `ObjectExpressionsSuite`. Author: Takeshi Yamamuro <[email protected]> Closes apache#20979 from maropu/SPARK-23588.
1 parent 1b08c43 commit e134165

File tree

2 files changed

+63
-10
lines changed

2 files changed

+63
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import scala.util.Try
2828
import org.apache.spark.{SparkConf, SparkEnv}
2929
import org.apache.spark.serializer._
3030
import org.apache.spark.sql.Row
31-
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
31+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
3232
import org.apache.spark.sql.catalyst.ScalaReflection.universe.TermName
3333
import org.apache.spark.sql.catalyst.encoders.RowEncoder
3434
import org.apache.spark.sql.catalyst.expressions._
3535
import org.apache.spark.sql.catalyst.expressions.codegen._
36-
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
36+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
3737
import org.apache.spark.sql.types._
3838
import org.apache.spark.util.Utils
3939

@@ -1033,8 +1033,39 @@ case class CatalystToExternalMap private(
10331033
override def children: Seq[Expression] =
10341034
keyLambdaFunction :: valueLambdaFunction :: inputData :: Nil
10351035

1036-
override def eval(input: InternalRow): Any =
1037-
throw new UnsupportedOperationException("Only code-generated evaluation is supported")
1036+
private lazy val inputMapType = inputData.dataType.asInstanceOf[MapType]
1037+
1038+
private lazy val keyConverter =
1039+
CatalystTypeConverters.createToScalaConverter(inputMapType.keyType)
1040+
private lazy val valueConverter =
1041+
CatalystTypeConverters.createToScalaConverter(inputMapType.valueType)
1042+
1043+
private def newMapBuilder(): Builder[AnyRef, AnyRef] = {
1044+
val clazz = Utils.classForName(collClass.getCanonicalName + "$")
1045+
val module = clazz.getField("MODULE$").get(null)
1046+
val method = clazz.getMethod("newBuilder")
1047+
method.invoke(module).asInstanceOf[Builder[AnyRef, AnyRef]]
1048+
}
1049+
1050+
override def eval(input: InternalRow): Any = {
1051+
val result = inputData.eval(input).asInstanceOf[MapData]
1052+
if (result != null) {
1053+
val builder = newMapBuilder()
1054+
builder.sizeHint(result.numElements())
1055+
val keyArray = result.keyArray()
1056+
val valueArray = result.valueArray()
1057+
var i = 0
1058+
while (i < result.numElements()) {
1059+
val key = keyConverter(keyArray.get(i, inputMapType.keyType))
1060+
val value = valueConverter(valueArray.get(i, inputMapType.valueType))
1061+
builder += Tuple2(key, value)
1062+
i += 1
1063+
}
1064+
builder.result()
1065+
} else {
1066+
null
1067+
}
1068+
}
10381069

10391070
override def dataType: DataType = ObjectType(collClass)
10401071

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
2727
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
2828
import org.apache.spark.sql.{RandomDataGenerator, Row}
2929
import org.apache.spark.sql.catalyst.InternalRow
30-
import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
31-
import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, ExpressionEncoder, RowEncoder}
30+
import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer}
31+
import org.apache.spark.sql.catalyst.dsl.expressions._
32+
import org.apache.spark.sql.catalyst.encoders._
3233
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
3334
import org.apache.spark.sql.catalyst.expressions.objects._
34-
import org.apache.spark.sql.catalyst.util._
35-
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{SQLDate, SQLTimestamp}
35+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
36+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
37+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3638
import org.apache.spark.sql.internal.SQLConf
3739
import org.apache.spark.sql.types._
3840
import org.apache.spark.unsafe.types.UTF8String
@@ -162,9 +164,10 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
162164
"fromPrimitiveArray", ObjectType(classOf[Array[Int]]),
163165
Array[Int](1, 2, 3), UnsafeArrayData.fromPrimitiveArray(Array[Int](1, 2, 3))),
164166
(DateTimeUtils.getClass, ObjectType(classOf[Date]),
165-
"toJavaDate", ObjectType(classOf[SQLDate]), 77777, DateTimeUtils.toJavaDate(77777)),
167+
"toJavaDate", ObjectType(classOf[DateTimeUtils.SQLDate]), 77777,
168+
DateTimeUtils.toJavaDate(77777)),
166169
(DateTimeUtils.getClass, ObjectType(classOf[Timestamp]),
167-
"toJavaTimestamp", ObjectType(classOf[SQLTimestamp]),
170+
"toJavaTimestamp", ObjectType(classOf[DateTimeUtils.SQLTimestamp]),
168171
88888888.toLong, DateTimeUtils.toJavaTimestamp(88888888))
169172
).foreach { case (cls, dataType, methodName, argType, arg, expected) =>
170173
checkObjectExprEvaluation(StaticInvoke(cls, dataType, methodName,
@@ -450,6 +453,25 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
450453
}
451454
}
452455
}
456+
457+
implicit private def mapIntStrEncoder = ExpressionEncoder[Map[Int, String]]()
458+
459+
test("SPARK-23588 CatalystToExternalMap should support interpreted execution") {
460+
// To get a resolved `CatalystToExternalMap` expression, we build a deserializer plan
461+
// with dummy input, resolve the plan by the analyzer, and replace the dummy input
462+
// with a literal for tests.
463+
val unresolvedDeser = UnresolvedDeserializer(encoderFor[Map[Int, String]].deserializer)
464+
val dummyInputPlan = LocalRelation('value.map(MapType(IntegerType, StringType)))
465+
val plan = Project(Alias(unresolvedDeser, "none")() :: Nil, dummyInputPlan)
466+
467+
val analyzedPlan = SimpleAnalyzer.execute(plan)
468+
val Alias(toMapExpr: CatalystToExternalMap, _) = analyzedPlan.expressions.head
469+
470+
// Replaces the dummy input with a literal for tests here
471+
val data = Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3")
472+
val deserializer = toMapExpr.copy(inputData = Literal.create(data))
473+
checkObjectExprEvaluation(deserializer, expected = data)
474+
}
453475
}
454476

455477
class TestBean extends Serializable {

0 commit comments

Comments
 (0)