Skip to content

Commit afbdf42

Browse files
maropuhvanhovell
authored andcommitted
[SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted execution
## What changes were proposed in this pull request? This pr supported interpreted mode for `ExternalMapToCatalyst`. ## How was this patch tested? Added tests in `ObjectExpressionsSuite`. Author: Takeshi Yamamuro <[email protected]> Closes apache#20980 from maropu/SPARK-23589.
1 parent d87d30e commit afbdf42

File tree

2 files changed

+165
-3
lines changed

2 files changed

+165
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,8 +1255,64 @@ case class ExternalMapToCatalyst private(
12551255
override def dataType: MapType = MapType(
12561256
keyConverter.dataType, valueConverter.dataType, valueContainsNull = valueConverter.nullable)
12571257

1258-
override def eval(input: InternalRow): Any =
1259-
throw new UnsupportedOperationException("Only code-generated evaluation is supported")
1258+
private lazy val mapCatalystConverter: Any => (Array[Any], Array[Any]) = child.dataType match {
1259+
case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) =>
1260+
(input: Any) => {
1261+
val data = input.asInstanceOf[java.util.Map[Any, Any]]
1262+
val keys = new Array[Any](data.size)
1263+
val values = new Array[Any](data.size)
1264+
val iter = data.entrySet().iterator()
1265+
var i = 0
1266+
while (iter.hasNext) {
1267+
val entry = iter.next()
1268+
val (key, value) = (entry.getKey, entry.getValue)
1269+
keys(i) = if (key != null) {
1270+
keyConverter.eval(InternalRow.fromSeq(key :: Nil))
1271+
} else {
1272+
throw new RuntimeException("Cannot use null as map key!")
1273+
}
1274+
values(i) = if (value != null) {
1275+
valueConverter.eval(InternalRow.fromSeq(value :: Nil))
1276+
} else {
1277+
null
1278+
}
1279+
i += 1
1280+
}
1281+
(keys, values)
1282+
}
1283+
1284+
case ObjectType(cls) if classOf[scala.collection.Map[_, _]].isAssignableFrom(cls) =>
1285+
(input: Any) => {
1286+
val data = input.asInstanceOf[scala.collection.Map[Any, Any]]
1287+
val keys = new Array[Any](data.size)
1288+
val values = new Array[Any](data.size)
1289+
var i = 0
1290+
for ((key, value) <- data) {
1291+
keys(i) = if (key != null) {
1292+
keyConverter.eval(InternalRow.fromSeq(key :: Nil))
1293+
} else {
1294+
throw new RuntimeException("Cannot use null as map key!")
1295+
}
1296+
values(i) = if (value != null) {
1297+
valueConverter.eval(InternalRow.fromSeq(value :: Nil))
1298+
} else {
1299+
null
1300+
}
1301+
i += 1
1302+
}
1303+
(keys, values)
1304+
}
1305+
}
1306+
1307+
override def eval(input: InternalRow): Any = {
1308+
val result = child.eval(input)
1309+
if (result != null) {
1310+
val (keys, values) = mapCatalystConverter(result)
1311+
new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values))
1312+
} else {
1313+
null
1314+
}
1315+
}
12601316

12611317
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
12621318
val inputMap = child.genCode(ctx)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ import java.sql.{Date, Timestamp}
2121

2222
import scala.collection.JavaConverters._
2323
import scala.reflect.ClassTag
24+
import scala.reflect.runtime.universe.TypeTag
2425
import scala.util.Random
2526

2627
import org.apache.spark.{SparkConf, SparkFunSuite}
2728
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
2829
import org.apache.spark.sql.{RandomDataGenerator, Row}
29-
import org.apache.spark.sql.catalyst.InternalRow
30+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, JavaTypeInference, ScalaReflection}
3031
import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer}
3132
import org.apache.spark.sql.catalyst.dsl.expressions._
3233
import org.apache.spark.sql.catalyst.encoders._
@@ -501,6 +502,111 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
501502
InternalRow.fromSeq(Seq(Row(1))),
502503
"java.lang.Integer is not a valid external type for schema of double")
503504
}
505+
506+
private def javaMapSerializerFor(
507+
keyClazz: Class[_],
508+
valueClazz: Class[_])(inputObject: Expression): Expression = {
509+
510+
def kvSerializerFor(inputObject: Expression, clazz: Class[_]): Expression = clazz match {
511+
case c if c == classOf[java.lang.Integer] =>
512+
Invoke(inputObject, "intValue", IntegerType)
513+
case c if c == classOf[java.lang.String] =>
514+
StaticInvoke(
515+
classOf[UTF8String],
516+
StringType,
517+
"fromString",
518+
inputObject :: Nil,
519+
returnNullable = false)
520+
}
521+
522+
ExternalMapToCatalyst(
523+
inputObject,
524+
ObjectType(keyClazz),
525+
kvSerializerFor(_, keyClazz),
526+
keyNullable = true,
527+
ObjectType(valueClazz),
528+
kvSerializerFor(_, valueClazz),
529+
valueNullable = true
530+
)
531+
}
532+
533+
private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = {
534+
import org.apache.spark.sql.catalyst.ScalaReflection._
535+
536+
val curId = new java.util.concurrent.atomic.AtomicInteger()
537+
538+
def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression =
539+
localTypeOf[V].dealias match {
540+
case t if t <:< localTypeOf[java.lang.Integer] =>
541+
Invoke(inputObject, "intValue", IntegerType)
542+
case t if t <:< localTypeOf[String] =>
543+
StaticInvoke(
544+
classOf[UTF8String],
545+
StringType,
546+
"fromString",
547+
inputObject :: Nil,
548+
returnNullable = false)
549+
case _ =>
550+
inputObject
551+
}
552+
553+
ExternalMapToCatalyst(
554+
inputObject,
555+
dataTypeFor[T],
556+
kvSerializerFor[T],
557+
keyNullable = !localTypeOf[T].typeSymbol.asClass.isPrimitive,
558+
dataTypeFor[U],
559+
kvSerializerFor[U],
560+
valueNullable = !localTypeOf[U].typeSymbol.asClass.isPrimitive
561+
)
562+
}
563+
564+
test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") {
565+
// Simple test
566+
val scalaMap = scala.collection.Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3")
567+
val javaMap = new java.util.HashMap[java.lang.Integer, java.lang.String]() {
568+
{
569+
put(0, "v0")
570+
put(1, "v1")
571+
put(2, null)
572+
put(3, "v3")
573+
}
574+
}
575+
val expected = CatalystTypeConverters.convertToCatalyst(scalaMap)
576+
577+
// Java Map
578+
val serializer1 = javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])(
579+
Literal.fromObject(javaMap))
580+
checkEvaluation(serializer1, expected)
581+
582+
// Scala Map
583+
val serializer2 = scalaMapSerializerFor[Int, String](Literal.fromObject(scalaMap))
584+
checkEvaluation(serializer2, expected)
585+
586+
// NULL key test
587+
val scalaMapHasNullKey = scala.collection.Map[java.lang.Integer, String](
588+
null.asInstanceOf[java.lang.Integer] -> "v0", new java.lang.Integer(1) -> "v1")
589+
val javaMapHasNullKey = new java.util.HashMap[java.lang.Integer, java.lang.String]() {
590+
{
591+
put(null, "v0")
592+
put(1, "v1")
593+
}
594+
}
595+
596+
// Java Map
597+
val serializer3 =
598+
javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])(
599+
Literal.fromObject(javaMapHasNullKey))
600+
checkExceptionInExpression[RuntimeException](
601+
serializer3, EmptyRow, "Cannot use null as map key!")
602+
603+
// Scala Map
604+
val serializer4 = scalaMapSerializerFor[java.lang.Integer, String](
605+
Literal.fromObject(scalaMapHasNullKey))
606+
607+
checkExceptionInExpression[RuntimeException](
608+
serializer4, EmptyRow, "Cannot use null as map key!")
609+
}
504610
}
505611

506612
class TestBean extends Serializable {

0 commit comments

Comments
 (0)